1"""Tests for tasks.py.""" 2 3import collections 4import contextlib 5import contextvars 6import functools 7import gc 8import io 9import random 10import re 11import sys 12import textwrap 13import traceback 14import unittest 15from unittest import mock 16from types import GenericAlias 17 18import asyncio 19from asyncio import coroutines 20from asyncio import futures 21from asyncio import tasks 22from test.test_asyncio import utils as test_utils 23from test import support 24from test.support.script_helper import assert_python_ok 25 26 27def tearDownModule(): 28 asyncio.set_event_loop_policy(None) 29 30 31async def coroutine_function(): 32 pass 33 34 35def format_coroutine(qualname, state, src, source_traceback, generator=False): 36 if generator: 37 state = '%s' % state 38 else: 39 state = '%s, defined' % state 40 if source_traceback is not None: 41 frame = source_traceback[-1] 42 return ('coro=<%s() %s at %s> created at %s:%s' 43 % (qualname, state, src, frame[0], frame[1])) 44 else: 45 return 'coro=<%s() %s at %s>' % (qualname, state, src) 46 47 48def get_innermost_context(exc): 49 """ 50 Return information about the innermost exception context in the chain. 51 """ 52 depth = 0 53 while True: 54 context = exc.__context__ 55 if context is None: 56 break 57 58 exc = context 59 depth += 1 60 61 return (type(exc), exc.args, depth) 62 63 64class Dummy: 65 66 def __repr__(self): 67 return '<Dummy>' 68 69 def __call__(self, *args): 70 pass 71 72 73class CoroLikeObject: 74 def send(self, v): 75 raise StopIteration(42) 76 77 def throw(self, *exc): 78 pass 79 80 def close(self): 81 pass 82 83 def __await__(self): 84 return self 85 86 87class BaseTaskTests: 88 89 Task = None 90 Future = None 91 92 def new_task(self, loop, coro, name='TestTask', context=None): 93 return self.__class__.Task(coro, loop=loop, name=name, context=context) 94 95 def new_future(self, loop): 96 return self.__class__.Future(loop=loop) 97 98 def setUp(self): 99 super().setUp() 100 self.loop = self.new_test_loop() 101 self.loop.set_task_factory(self.new_task) 102 self.loop.create_future = lambda: self.new_future(self.loop) 103 104 def test_generic_alias(self): 105 task = self.__class__.Task[str] 106 self.assertEqual(task.__args__, (str,)) 107 self.assertIsInstance(task, GenericAlias) 108 109 def test_task_cancel_message_getter(self): 110 async def coro(): 111 pass 112 t = self.new_task(self.loop, coro()) 113 self.assertTrue(hasattr(t, '_cancel_message')) 114 self.assertEqual(t._cancel_message, None) 115 116 t.cancel('my message') 117 self.assertEqual(t._cancel_message, 'my message') 118 119 with self.assertRaises(asyncio.CancelledError) as cm: 120 self.loop.run_until_complete(t) 121 122 self.assertEqual('my message', cm.exception.args[0]) 123 124 def test_task_cancel_message_setter(self): 125 async def coro(): 126 pass 127 t = self.new_task(self.loop, coro()) 128 t.cancel('my message') 129 t._cancel_message = 'my new message' 130 self.assertEqual(t._cancel_message, 'my new message') 131 132 with self.assertRaises(asyncio.CancelledError) as cm: 133 self.loop.run_until_complete(t) 134 135 self.assertEqual('my new message', cm.exception.args[0]) 136 137 def test_task_del_collect(self): 138 class Evil: 139 def __del__(self): 140 gc.collect() 141 142 async def run(): 143 return Evil() 144 145 self.loop.run_until_complete( 146 asyncio.gather(*[ 147 self.new_task(self.loop, run()) for _ in range(100) 148 ])) 149 150 def test_other_loop_future(self): 151 other_loop = asyncio.new_event_loop() 152 fut = self.new_future(other_loop) 153 154 async def run(fut): 155 await fut 156 157 try: 158 with self.assertRaisesRegex(RuntimeError, 159 r'Task .* got Future .* attached'): 160 self.loop.run_until_complete(run(fut)) 161 finally: 162 other_loop.close() 163 164 def test_task_awaits_on_itself(self): 165 166 async def test(): 167 await task 168 169 task = asyncio.ensure_future(test(), loop=self.loop) 170 171 with self.assertRaisesRegex(RuntimeError, 172 'Task cannot await on itself'): 173 self.loop.run_until_complete(task) 174 175 def test_task_class(self): 176 async def notmuch(): 177 return 'ok' 178 t = self.new_task(self.loop, notmuch()) 179 self.loop.run_until_complete(t) 180 self.assertTrue(t.done()) 181 self.assertEqual(t.result(), 'ok') 182 self.assertIs(t._loop, self.loop) 183 self.assertIs(t.get_loop(), self.loop) 184 185 loop = asyncio.new_event_loop() 186 self.set_event_loop(loop) 187 t = self.new_task(loop, notmuch()) 188 self.assertIs(t._loop, loop) 189 loop.run_until_complete(t) 190 loop.close() 191 192 def test_ensure_future_coroutine(self): 193 async def notmuch(): 194 return 'ok' 195 t = asyncio.ensure_future(notmuch(), loop=self.loop) 196 self.assertIs(t._loop, self.loop) 197 self.loop.run_until_complete(t) 198 self.assertTrue(t.done()) 199 self.assertEqual(t.result(), 'ok') 200 201 a = notmuch() 202 self.addCleanup(a.close) 203 with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 204 asyncio.ensure_future(a) 205 206 async def test(): 207 return asyncio.ensure_future(notmuch()) 208 t = self.loop.run_until_complete(test()) 209 self.assertIs(t._loop, self.loop) 210 self.loop.run_until_complete(t) 211 self.assertTrue(t.done()) 212 self.assertEqual(t.result(), 'ok') 213 214 # Deprecated in 3.10, undeprecated in 3.11.1 215 asyncio.set_event_loop(self.loop) 216 self.addCleanup(asyncio.set_event_loop, None) 217 t = asyncio.ensure_future(notmuch()) 218 self.assertIs(t._loop, self.loop) 219 self.loop.run_until_complete(t) 220 self.assertTrue(t.done()) 221 self.assertEqual(t.result(), 'ok') 222 223 def test_ensure_future_future(self): 224 f_orig = self.new_future(self.loop) 225 f_orig.set_result('ko') 226 227 f = asyncio.ensure_future(f_orig) 228 self.loop.run_until_complete(f) 229 self.assertTrue(f.done()) 230 self.assertEqual(f.result(), 'ko') 231 self.assertIs(f, f_orig) 232 233 loop = asyncio.new_event_loop() 234 self.set_event_loop(loop) 235 236 with self.assertRaises(ValueError): 237 f = asyncio.ensure_future(f_orig, loop=loop) 238 239 loop.close() 240 241 f = asyncio.ensure_future(f_orig, loop=self.loop) 242 self.assertIs(f, f_orig) 243 244 def test_ensure_future_task(self): 245 async def notmuch(): 246 return 'ok' 247 t_orig = self.new_task(self.loop, notmuch()) 248 t = asyncio.ensure_future(t_orig) 249 self.loop.run_until_complete(t) 250 self.assertTrue(t.done()) 251 self.assertEqual(t.result(), 'ok') 252 self.assertIs(t, t_orig) 253 254 loop = asyncio.new_event_loop() 255 self.set_event_loop(loop) 256 257 with self.assertRaises(ValueError): 258 t = asyncio.ensure_future(t_orig, loop=loop) 259 260 loop.close() 261 262 t = asyncio.ensure_future(t_orig, loop=self.loop) 263 self.assertIs(t, t_orig) 264 265 def test_ensure_future_awaitable(self): 266 class Aw: 267 def __init__(self, coro): 268 self.coro = coro 269 def __await__(self): 270 return self.coro.__await__() 271 272 async def coro(): 273 return 'ok' 274 275 loop = asyncio.new_event_loop() 276 self.set_event_loop(loop) 277 fut = asyncio.ensure_future(Aw(coro()), loop=loop) 278 loop.run_until_complete(fut) 279 self.assertEqual(fut.result(), 'ok') 280 281 def test_ensure_future_neither(self): 282 with self.assertRaises(TypeError): 283 asyncio.ensure_future('ok') 284 285 def test_ensure_future_error_msg(self): 286 loop = asyncio.new_event_loop() 287 f = self.new_future(self.loop) 288 with self.assertRaisesRegex(ValueError, 'The future belongs to a ' 289 'different loop than the one specified as ' 290 'the loop argument'): 291 asyncio.ensure_future(f, loop=loop) 292 loop.close() 293 294 def test_get_stack(self): 295 T = None 296 297 async def foo(): 298 await bar() 299 300 async def bar(): 301 # test get_stack() 302 f = T.get_stack(limit=1) 303 try: 304 self.assertEqual(f[0].f_code.co_name, 'foo') 305 finally: 306 f = None 307 308 # test print_stack() 309 file = io.StringIO() 310 T.print_stack(limit=1, file=file) 311 file.seek(0) 312 tb = file.read() 313 self.assertRegex(tb, r'foo\(\) running') 314 315 async def runner(): 316 nonlocal T 317 T = asyncio.ensure_future(foo(), loop=self.loop) 318 await T 319 320 self.loop.run_until_complete(runner()) 321 322 def test_task_repr(self): 323 self.loop.set_debug(False) 324 325 async def notmuch(): 326 return 'abc' 327 328 # test coroutine function 329 self.assertEqual(notmuch.__name__, 'notmuch') 330 self.assertRegex(notmuch.__qualname__, 331 r'\w+.test_task_repr.<locals>.notmuch') 332 self.assertEqual(notmuch.__module__, __name__) 333 334 filename, lineno = test_utils.get_function_source(notmuch) 335 src = "%s:%s" % (filename, lineno) 336 337 # test coroutine object 338 gen = notmuch() 339 coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' 340 self.assertEqual(gen.__name__, 'notmuch') 341 self.assertEqual(gen.__qualname__, coro_qualname) 342 343 # test pending Task 344 t = self.new_task(self.loop, gen) 345 t.add_done_callback(Dummy()) 346 347 coro = format_coroutine(coro_qualname, 'running', src, 348 t._source_traceback, generator=True) 349 self.assertEqual(repr(t), 350 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 351 352 # test cancelling Task 353 t.cancel() # Does not take immediate effect! 354 self.assertEqual(repr(t), 355 "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro) 356 357 # test cancelled Task 358 self.assertRaises(asyncio.CancelledError, 359 self.loop.run_until_complete, t) 360 coro = format_coroutine(coro_qualname, 'done', src, 361 t._source_traceback) 362 self.assertEqual(repr(t), 363 "<Task cancelled name='TestTask' %s>" % coro) 364 365 # test finished Task 366 t = self.new_task(self.loop, notmuch()) 367 self.loop.run_until_complete(t) 368 coro = format_coroutine(coro_qualname, 'done', src, 369 t._source_traceback) 370 self.assertEqual(repr(t), 371 "<Task finished name='TestTask' %s result='abc'>" % coro) 372 373 def test_task_repr_autogenerated(self): 374 async def notmuch(): 375 return 123 376 377 t1 = self.new_task(self.loop, notmuch(), None) 378 t2 = self.new_task(self.loop, notmuch(), None) 379 self.assertNotEqual(repr(t1), repr(t2)) 380 381 match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1)) 382 self.assertIsNotNone(match1) 383 match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2)) 384 self.assertIsNotNone(match2) 385 386 # Autogenerated task names should have monotonically increasing numbers 387 self.assertLess(int(match1.group(1)), int(match2.group(1))) 388 self.loop.run_until_complete(t1) 389 self.loop.run_until_complete(t2) 390 391 def test_task_repr_name_not_str(self): 392 async def notmuch(): 393 return 123 394 395 t = self.new_task(self.loop, notmuch()) 396 t.set_name({6}) 397 self.assertEqual(t.get_name(), '{6}') 398 self.loop.run_until_complete(t) 399 400 def test_task_repr_wait_for(self): 401 self.loop.set_debug(False) 402 403 async def wait_for(fut): 404 return await fut 405 406 fut = self.new_future(self.loop) 407 task = self.new_task(self.loop, wait_for(fut)) 408 test_utils.run_briefly(self.loop) 409 self.assertRegex(repr(task), 410 '<Task .* wait_for=%s>' % re.escape(repr(fut))) 411 412 fut.set_result(None) 413 self.loop.run_until_complete(task) 414 415 def test_task_basics(self): 416 417 async def outer(): 418 a = await inner1() 419 b = await inner2() 420 return a+b 421 422 async def inner1(): 423 return 42 424 425 async def inner2(): 426 return 1000 427 428 t = outer() 429 self.assertEqual(self.loop.run_until_complete(t), 1042) 430 431 def test_exception_chaining_after_await(self): 432 # Test that when awaiting on a task when an exception is already 433 # active, if the task raises an exception it will be chained 434 # with the original. 435 loop = asyncio.new_event_loop() 436 self.set_event_loop(loop) 437 438 async def raise_error(): 439 raise ValueError 440 441 async def run(): 442 try: 443 raise KeyError(3) 444 except Exception as exc: 445 task = self.new_task(loop, raise_error()) 446 try: 447 await task 448 except Exception as exc: 449 self.assertEqual(type(exc), ValueError) 450 chained = exc.__context__ 451 self.assertEqual((type(chained), chained.args), 452 (KeyError, (3,))) 453 454 try: 455 task = self.new_task(loop, run()) 456 loop.run_until_complete(task) 457 finally: 458 loop.close() 459 460 def test_exception_chaining_after_await_with_context_cycle(self): 461 # Check trying to create an exception context cycle: 462 # https://bugs.python.org/issue40696 463 has_cycle = None 464 loop = asyncio.new_event_loop() 465 self.set_event_loop(loop) 466 467 async def process_exc(exc): 468 raise exc 469 470 async def run(): 471 nonlocal has_cycle 472 try: 473 raise KeyError('a') 474 except Exception as exc: 475 task = self.new_task(loop, process_exc(exc)) 476 try: 477 await task 478 except BaseException as exc: 479 has_cycle = (exc is exc.__context__) 480 # Prevent a hang if has_cycle is True. 481 exc.__context__ = None 482 483 try: 484 task = self.new_task(loop, run()) 485 loop.run_until_complete(task) 486 finally: 487 loop.close() 488 # This also distinguishes from the initial has_cycle=None. 489 self.assertEqual(has_cycle, False) 490 491 492 def test_cancelling(self): 493 loop = asyncio.new_event_loop() 494 495 async def task(): 496 await asyncio.sleep(10) 497 498 try: 499 t = self.new_task(loop, task()) 500 self.assertFalse(t.cancelling()) 501 self.assertNotIn(" cancelling ", repr(t)) 502 self.assertTrue(t.cancel()) 503 self.assertTrue(t.cancelling()) 504 self.assertIn(" cancelling ", repr(t)) 505 506 # Since we commented out two lines from Task.cancel(), 507 # this t.cancel() call now returns True. 508 # self.assertFalse(t.cancel()) 509 self.assertTrue(t.cancel()) 510 511 with self.assertRaises(asyncio.CancelledError): 512 loop.run_until_complete(t) 513 finally: 514 loop.close() 515 516 def test_uncancel_basic(self): 517 loop = asyncio.new_event_loop() 518 519 async def task(): 520 try: 521 await asyncio.sleep(10) 522 except asyncio.CancelledError: 523 asyncio.current_task().uncancel() 524 await asyncio.sleep(10) 525 526 try: 527 t = self.new_task(loop, task()) 528 loop.run_until_complete(asyncio.sleep(0.01)) 529 530 # Cancel first sleep 531 self.assertTrue(t.cancel()) 532 self.assertIn(" cancelling ", repr(t)) 533 self.assertEqual(t.cancelling(), 1) 534 self.assertFalse(t.cancelled()) # Task is still not complete 535 loop.run_until_complete(asyncio.sleep(0.01)) 536 537 # after .uncancel() 538 self.assertNotIn(" cancelling ", repr(t)) 539 self.assertEqual(t.cancelling(), 0) 540 self.assertFalse(t.cancelled()) # Task is still not complete 541 542 # Cancel second sleep 543 self.assertTrue(t.cancel()) 544 self.assertEqual(t.cancelling(), 1) 545 self.assertFalse(t.cancelled()) # Task is still not complete 546 with self.assertRaises(asyncio.CancelledError): 547 loop.run_until_complete(t) 548 self.assertTrue(t.cancelled()) # Finally, task complete 549 self.assertTrue(t.done()) 550 551 # uncancel is no longer effective after the task is complete 552 t.uncancel() 553 self.assertTrue(t.cancelled()) 554 self.assertTrue(t.done()) 555 finally: 556 loop.close() 557 558 def test_uncancel_structured_blocks(self): 559 # This test recreates the following high-level structure using uncancel():: 560 # 561 # async def make_request_with_timeout(): 562 # try: 563 # async with asyncio.timeout(1): 564 # # Structured block affected by the timeout: 565 # await make_request() 566 # await make_another_request() 567 # except TimeoutError: 568 # pass # There was a timeout 569 # # Outer code not affected by the timeout: 570 # await unrelated_code() 571 572 loop = asyncio.new_event_loop() 573 574 async def make_request_with_timeout(*, sleep: float, timeout: float): 575 task = asyncio.current_task() 576 loop = task.get_loop() 577 578 timed_out = False 579 structured_block_finished = False 580 outer_code_reached = False 581 582 def on_timeout(): 583 nonlocal timed_out 584 timed_out = True 585 task.cancel() 586 587 timeout_handle = loop.call_later(timeout, on_timeout) 588 try: 589 try: 590 # Structured block affected by the timeout 591 await asyncio.sleep(sleep) 592 structured_block_finished = True 593 finally: 594 timeout_handle.cancel() 595 if ( 596 timed_out 597 and task.uncancel() == 0 598 and sys.exc_info()[0] is asyncio.CancelledError 599 ): 600 # Note the five rules that are needed here to satisfy proper 601 # uncancellation: 602 # 603 # 1. handle uncancellation in a `finally:` block to allow for 604 # plain returns; 605 # 2. our `timed_out` flag is set, meaning that it was our event 606 # that triggered the need to uncancel the task, regardless of 607 # what exception is raised; 608 # 3. we can call `uncancel()` because *we* called `cancel()` 609 # before; 610 # 4. we call `uncancel()` but we only continue converting the 611 # CancelledError to TimeoutError if `uncancel()` caused the 612 # cancellation request count go down to 0. We need to look 613 # at the counter vs having a simple boolean flag because our 614 # code might have been nested (think multiple timeouts). See 615 # commit 7fce1063b6e5a366f8504e039a8ccdd6944625cd for 616 # details. 617 # 5. we only convert CancelledError to TimeoutError; for other 618 # exceptions raised due to the cancellation (like 619 # a ConnectionLostError from a database client), simply 620 # propagate them. 621 # 622 # Those checks need to take place in this exact order to make 623 # sure the `cancelling()` counter always stays in sync. 624 # 625 # Additionally, the original stimulus to `cancel()` the task 626 # needs to be unscheduled to avoid re-cancelling the task later. 627 # Here we do it by cancelling `timeout_handle` in the `finally:` 628 # block. 629 raise TimeoutError 630 except TimeoutError: 631 self.assertTrue(timed_out) 632 633 # Outer code not affected by the timeout: 634 outer_code_reached = True 635 await asyncio.sleep(0) 636 return timed_out, structured_block_finished, outer_code_reached 637 638 try: 639 # Test which timed out. 640 t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1)) 641 timed_out, structured_block_finished, outer_code_reached = ( 642 loop.run_until_complete(t1) 643 ) 644 self.assertTrue(timed_out) 645 self.assertFalse(structured_block_finished) # it was cancelled 646 self.assertTrue(outer_code_reached) # task got uncancelled after leaving 647 # the structured block and continued until 648 # completion 649 self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task 650 651 # Test which did not time out. 652 t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0)) 653 timed_out, structured_block_finished, outer_code_reached = ( 654 loop.run_until_complete(t2) 655 ) 656 self.assertFalse(timed_out) 657 self.assertTrue(structured_block_finished) 658 self.assertTrue(outer_code_reached) 659 self.assertEqual(t2.cancelling(), 0) 660 finally: 661 loop.close() 662 663 def test_cancel(self): 664 665 def gen(): 666 when = yield 667 self.assertAlmostEqual(10.0, when) 668 yield 0 669 670 loop = self.new_test_loop(gen) 671 672 async def task(): 673 await asyncio.sleep(10.0) 674 return 12 675 676 t = self.new_task(loop, task()) 677 loop.call_soon(t.cancel) 678 with self.assertRaises(asyncio.CancelledError): 679 loop.run_until_complete(t) 680 self.assertTrue(t.done()) 681 self.assertTrue(t.cancelled()) 682 self.assertFalse(t.cancel()) 683 684 def test_cancel_with_message_then_future_result(self): 685 # Test Future.result() after calling cancel() with a message. 686 cases = [ 687 ((), ()), 688 ((None,), ()), 689 (('my message',), ('my message',)), 690 # Non-string values should roundtrip. 691 ((5,), (5,)), 692 ] 693 for cancel_args, expected_args in cases: 694 with self.subTest(cancel_args=cancel_args): 695 loop = asyncio.new_event_loop() 696 self.set_event_loop(loop) 697 698 async def sleep(): 699 await asyncio.sleep(10) 700 701 async def coro(): 702 task = self.new_task(loop, sleep()) 703 await asyncio.sleep(0) 704 task.cancel(*cancel_args) 705 done, pending = await asyncio.wait([task]) 706 task.result() 707 708 task = self.new_task(loop, coro()) 709 with self.assertRaises(asyncio.CancelledError) as cm: 710 loop.run_until_complete(task) 711 exc = cm.exception 712 self.assertEqual(exc.args, expected_args) 713 714 actual = get_innermost_context(exc) 715 self.assertEqual(actual, 716 (asyncio.CancelledError, expected_args, 0)) 717 718 def test_cancel_with_message_then_future_exception(self): 719 # Test Future.exception() after calling cancel() with a message. 720 cases = [ 721 ((), ()), 722 ((None,), ()), 723 (('my message',), ('my message',)), 724 # Non-string values should roundtrip. 725 ((5,), (5,)), 726 ] 727 for cancel_args, expected_args in cases: 728 with self.subTest(cancel_args=cancel_args): 729 loop = asyncio.new_event_loop() 730 self.set_event_loop(loop) 731 732 async def sleep(): 733 await asyncio.sleep(10) 734 735 async def coro(): 736 task = self.new_task(loop, sleep()) 737 await asyncio.sleep(0) 738 task.cancel(*cancel_args) 739 done, pending = await asyncio.wait([task]) 740 task.exception() 741 742 task = self.new_task(loop, coro()) 743 with self.assertRaises(asyncio.CancelledError) as cm: 744 loop.run_until_complete(task) 745 exc = cm.exception 746 self.assertEqual(exc.args, expected_args) 747 748 actual = get_innermost_context(exc) 749 self.assertEqual(actual, 750 (asyncio.CancelledError, expected_args, 0)) 751 752 def test_cancellation_exception_context(self): 753 loop = asyncio.new_event_loop() 754 self.set_event_loop(loop) 755 fut = loop.create_future() 756 757 async def sleep(): 758 fut.set_result(None) 759 await asyncio.sleep(10) 760 761 async def coro(): 762 inner_task = self.new_task(loop, sleep()) 763 await fut 764 loop.call_soon(inner_task.cancel, 'msg') 765 try: 766 await inner_task 767 except asyncio.CancelledError as ex: 768 raise ValueError("cancelled") from ex 769 770 task = self.new_task(loop, coro()) 771 with self.assertRaises(ValueError) as cm: 772 loop.run_until_complete(task) 773 exc = cm.exception 774 self.assertEqual(exc.args, ('cancelled',)) 775 776 actual = get_innermost_context(exc) 777 self.assertEqual(actual, 778 (asyncio.CancelledError, ('msg',), 1)) 779 780 def test_cancel_with_message_before_starting_task(self): 781 loop = asyncio.new_event_loop() 782 self.set_event_loop(loop) 783 784 async def sleep(): 785 await asyncio.sleep(10) 786 787 async def coro(): 788 task = self.new_task(loop, sleep()) 789 # We deliberately leave out the sleep here. 790 task.cancel('my message') 791 done, pending = await asyncio.wait([task]) 792 task.exception() 793 794 task = self.new_task(loop, coro()) 795 with self.assertRaises(asyncio.CancelledError) as cm: 796 loop.run_until_complete(task) 797 exc = cm.exception 798 self.assertEqual(exc.args, ('my message',)) 799 800 actual = get_innermost_context(exc) 801 self.assertEqual(actual, 802 (asyncio.CancelledError, ('my message',), 0)) 803 804 def test_cancel_yield(self): 805 async def task(): 806 await asyncio.sleep(0) 807 await asyncio.sleep(0) 808 return 12 809 810 t = self.new_task(self.loop, task()) 811 test_utils.run_briefly(self.loop) # start coro 812 t.cancel() 813 self.assertRaises( 814 asyncio.CancelledError, self.loop.run_until_complete, t) 815 self.assertTrue(t.done()) 816 self.assertTrue(t.cancelled()) 817 self.assertFalse(t.cancel()) 818 819 def test_cancel_inner_future(self): 820 f = self.new_future(self.loop) 821 822 async def task(): 823 await f 824 return 12 825 826 t = self.new_task(self.loop, task()) 827 test_utils.run_briefly(self.loop) # start task 828 f.cancel() 829 with self.assertRaises(asyncio.CancelledError): 830 self.loop.run_until_complete(t) 831 self.assertTrue(f.cancelled()) 832 self.assertTrue(t.cancelled()) 833 834 def test_cancel_both_task_and_inner_future(self): 835 f = self.new_future(self.loop) 836 837 async def task(): 838 await f 839 return 12 840 841 t = self.new_task(self.loop, task()) 842 test_utils.run_briefly(self.loop) 843 844 f.cancel() 845 t.cancel() 846 847 with self.assertRaises(asyncio.CancelledError): 848 self.loop.run_until_complete(t) 849 850 self.assertTrue(t.done()) 851 self.assertTrue(f.cancelled()) 852 self.assertTrue(t.cancelled()) 853 854 def test_cancel_task_catching(self): 855 fut1 = self.new_future(self.loop) 856 fut2 = self.new_future(self.loop) 857 858 async def task(): 859 await fut1 860 try: 861 await fut2 862 except asyncio.CancelledError: 863 return 42 864 865 t = self.new_task(self.loop, task()) 866 test_utils.run_briefly(self.loop) 867 self.assertIs(t._fut_waiter, fut1) # White-box test. 868 fut1.set_result(None) 869 test_utils.run_briefly(self.loop) 870 self.assertIs(t._fut_waiter, fut2) # White-box test. 871 t.cancel() 872 self.assertTrue(fut2.cancelled()) 873 res = self.loop.run_until_complete(t) 874 self.assertEqual(res, 42) 875 self.assertFalse(t.cancelled()) 876 877 def test_cancel_task_ignoring(self): 878 fut1 = self.new_future(self.loop) 879 fut2 = self.new_future(self.loop) 880 fut3 = self.new_future(self.loop) 881 882 async def task(): 883 await fut1 884 try: 885 await fut2 886 except asyncio.CancelledError: 887 pass 888 res = await fut3 889 return res 890 891 t = self.new_task(self.loop, task()) 892 test_utils.run_briefly(self.loop) 893 self.assertIs(t._fut_waiter, fut1) # White-box test. 894 fut1.set_result(None) 895 test_utils.run_briefly(self.loop) 896 self.assertIs(t._fut_waiter, fut2) # White-box test. 897 t.cancel() 898 self.assertTrue(fut2.cancelled()) 899 test_utils.run_briefly(self.loop) 900 self.assertIs(t._fut_waiter, fut3) # White-box test. 901 fut3.set_result(42) 902 res = self.loop.run_until_complete(t) 903 self.assertEqual(res, 42) 904 self.assertFalse(fut3.cancelled()) 905 self.assertFalse(t.cancelled()) 906 907 def test_cancel_current_task(self): 908 loop = asyncio.new_event_loop() 909 self.set_event_loop(loop) 910 911 async def task(): 912 t.cancel() 913 self.assertTrue(t._must_cancel) # White-box test. 914 # The sleep should be cancelled immediately. 915 await asyncio.sleep(100) 916 return 12 917 918 t = self.new_task(loop, task()) 919 self.assertFalse(t.cancelled()) 920 self.assertRaises( 921 asyncio.CancelledError, loop.run_until_complete, t) 922 self.assertTrue(t.done()) 923 self.assertTrue(t.cancelled()) 924 self.assertFalse(t._must_cancel) # White-box test. 925 self.assertFalse(t.cancel()) 926 927 def test_cancel_at_end(self): 928 """coroutine end right after task is cancelled""" 929 loop = asyncio.new_event_loop() 930 self.set_event_loop(loop) 931 932 async def task(): 933 t.cancel() 934 self.assertTrue(t._must_cancel) # White-box test. 935 return 12 936 937 t = self.new_task(loop, task()) 938 self.assertFalse(t.cancelled()) 939 self.assertRaises( 940 asyncio.CancelledError, loop.run_until_complete, t) 941 self.assertTrue(t.done()) 942 self.assertTrue(t.cancelled()) 943 self.assertFalse(t._must_cancel) # White-box test. 944 self.assertFalse(t.cancel()) 945 946 def test_cancel_awaited_task(self): 947 # This tests for a relatively rare condition when 948 # a task cancellation is requested for a task which is not 949 # currently blocked, such as a task cancelling itself. 950 # In this situation we must ensure that whatever next future 951 # or task the cancelled task blocks on is cancelled correctly 952 # as well. See also bpo-34872. 953 loop = asyncio.new_event_loop() 954 self.addCleanup(lambda: loop.close()) 955 956 task = nested_task = None 957 fut = self.new_future(loop) 958 959 async def nested(): 960 await fut 961 962 async def coro(): 963 nonlocal nested_task 964 # Create a sub-task and wait for it to run. 965 nested_task = self.new_task(loop, nested()) 966 await asyncio.sleep(0) 967 968 # Request the current task to be cancelled. 969 task.cancel() 970 # Block on the nested task, which should be immediately 971 # cancelled. 972 await nested_task 973 974 task = self.new_task(loop, coro()) 975 with self.assertRaises(asyncio.CancelledError): 976 loop.run_until_complete(task) 977 978 self.assertTrue(task.cancelled()) 979 self.assertTrue(nested_task.cancelled()) 980 self.assertTrue(fut.cancelled()) 981 982 def assert_text_contains(self, text, substr): 983 if substr not in text: 984 raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<') 985 986 def test_cancel_traceback_for_future_result(self): 987 # When calling Future.result() on a cancelled task, check that the 988 # line of code that was interrupted is included in the traceback. 989 loop = asyncio.new_event_loop() 990 self.set_event_loop(loop) 991 992 async def nested(): 993 # This will get cancelled immediately. 994 await asyncio.sleep(10) 995 996 async def coro(): 997 task = self.new_task(loop, nested()) 998 await asyncio.sleep(0) 999 task.cancel() 1000 await task # search target 1001 1002 task = self.new_task(loop, coro()) 1003 try: 1004 loop.run_until_complete(task) 1005 except asyncio.CancelledError: 1006 tb = traceback.format_exc() 1007 self.assert_text_contains(tb, "await asyncio.sleep(10)") 1008 # The intermediate await should also be included. 1009 self.assert_text_contains(tb, "await task # search target") 1010 else: 1011 self.fail('CancelledError did not occur') 1012 1013 def test_cancel_traceback_for_future_exception(self): 1014 # When calling Future.exception() on a cancelled task, check that the 1015 # line of code that was interrupted is included in the traceback. 1016 loop = asyncio.new_event_loop() 1017 self.set_event_loop(loop) 1018 1019 async def nested(): 1020 # This will get cancelled immediately. 1021 await asyncio.sleep(10) 1022 1023 async def coro(): 1024 task = self.new_task(loop, nested()) 1025 await asyncio.sleep(0) 1026 task.cancel() 1027 done, pending = await asyncio.wait([task]) 1028 task.exception() # search target 1029 1030 task = self.new_task(loop, coro()) 1031 try: 1032 loop.run_until_complete(task) 1033 except asyncio.CancelledError: 1034 tb = traceback.format_exc() 1035 self.assert_text_contains(tb, "await asyncio.sleep(10)") 1036 # The intermediate await should also be included. 1037 self.assert_text_contains(tb, 1038 "task.exception() # search target") 1039 else: 1040 self.fail('CancelledError did not occur') 1041 1042 def test_stop_while_run_in_complete(self): 1043 1044 def gen(): 1045 when = yield 1046 self.assertAlmostEqual(0.1, when) 1047 when = yield 0.1 1048 self.assertAlmostEqual(0.2, when) 1049 when = yield 0.1 1050 self.assertAlmostEqual(0.3, when) 1051 yield 0.1 1052 1053 loop = self.new_test_loop(gen) 1054 1055 x = 0 1056 1057 async def task(): 1058 nonlocal x 1059 while x < 10: 1060 await asyncio.sleep(0.1) 1061 x += 1 1062 if x == 2: 1063 loop.stop() 1064 1065 t = self.new_task(loop, task()) 1066 with self.assertRaises(RuntimeError) as cm: 1067 loop.run_until_complete(t) 1068 self.assertEqual(str(cm.exception), 1069 'Event loop stopped before Future completed.') 1070 self.assertFalse(t.done()) 1071 self.assertEqual(x, 2) 1072 self.assertAlmostEqual(0.3, loop.time()) 1073 1074 t.cancel() 1075 self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) 1076 1077 def test_log_traceback(self): 1078 async def coro(): 1079 pass 1080 1081 task = self.new_task(self.loop, coro()) 1082 with self.assertRaisesRegex(ValueError, 'can only be set to False'): 1083 task._log_traceback = True 1084 self.loop.run_until_complete(task) 1085 1086 def test_wait(self): 1087 1088 def gen(): 1089 when = yield 1090 self.assertAlmostEqual(0.1, when) 1091 when = yield 0 1092 self.assertAlmostEqual(0.15, when) 1093 yield 0.15 1094 1095 loop = self.new_test_loop(gen) 1096 1097 a = self.new_task(loop, asyncio.sleep(0.1)) 1098 b = self.new_task(loop, asyncio.sleep(0.15)) 1099 1100 async def foo(): 1101 done, pending = await asyncio.wait([b, a]) 1102 self.assertEqual(done, set([a, b])) 1103 self.assertEqual(pending, set()) 1104 return 42 1105 1106 res = loop.run_until_complete(self.new_task(loop, foo())) 1107 self.assertEqual(res, 42) 1108 self.assertAlmostEqual(0.15, loop.time()) 1109 1110 # Doing it again should take no time and exercise a different path. 1111 res = loop.run_until_complete(self.new_task(loop, foo())) 1112 self.assertAlmostEqual(0.15, loop.time()) 1113 self.assertEqual(res, 42) 1114 1115 def test_wait_duplicate_coroutines(self): 1116 1117 async def coro(s): 1118 return s 1119 c = self.loop.create_task(coro('test')) 1120 task = self.new_task( 1121 self.loop, 1122 asyncio.wait([c, c, self.loop.create_task(coro('spam'))])) 1123 1124 done, pending = self.loop.run_until_complete(task) 1125 1126 self.assertFalse(pending) 1127 self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) 1128 1129 def test_wait_errors(self): 1130 self.assertRaises( 1131 ValueError, self.loop.run_until_complete, 1132 asyncio.wait(set())) 1133 1134 # -1 is an invalid return_when value 1135 sleep_coro = asyncio.sleep(10.0) 1136 wait_coro = asyncio.wait([sleep_coro], return_when=-1) 1137 self.assertRaises(ValueError, 1138 self.loop.run_until_complete, wait_coro) 1139 1140 sleep_coro.close() 1141 1142 def test_wait_first_completed(self): 1143 1144 def gen(): 1145 when = yield 1146 self.assertAlmostEqual(10.0, when) 1147 when = yield 0 1148 self.assertAlmostEqual(0.1, when) 1149 yield 0.1 1150 1151 loop = self.new_test_loop(gen) 1152 1153 a = self.new_task(loop, asyncio.sleep(10.0)) 1154 b = self.new_task(loop, asyncio.sleep(0.1)) 1155 task = self.new_task( 1156 loop, 1157 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1158 1159 done, pending = loop.run_until_complete(task) 1160 self.assertEqual({b}, done) 1161 self.assertEqual({a}, pending) 1162 self.assertFalse(a.done()) 1163 self.assertTrue(b.done()) 1164 self.assertIsNone(b.result()) 1165 self.assertAlmostEqual(0.1, loop.time()) 1166 1167 # move forward to close generator 1168 loop.advance_time(10) 1169 loop.run_until_complete(asyncio.wait([a, b])) 1170 1171 def test_wait_really_done(self): 1172 # there is possibility that some tasks in the pending list 1173 # became done but their callbacks haven't all been called yet 1174 1175 async def coro1(): 1176 await asyncio.sleep(0) 1177 1178 async def coro2(): 1179 await asyncio.sleep(0) 1180 await asyncio.sleep(0) 1181 1182 a = self.new_task(self.loop, coro1()) 1183 b = self.new_task(self.loop, coro2()) 1184 task = self.new_task( 1185 self.loop, 1186 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1187 1188 done, pending = self.loop.run_until_complete(task) 1189 self.assertEqual({a, b}, done) 1190 self.assertTrue(a.done()) 1191 self.assertIsNone(a.result()) 1192 self.assertTrue(b.done()) 1193 self.assertIsNone(b.result()) 1194 1195 def test_wait_first_exception(self): 1196 1197 def gen(): 1198 when = yield 1199 self.assertAlmostEqual(10.0, when) 1200 yield 0 1201 1202 loop = self.new_test_loop(gen) 1203 1204 # first_exception, task already has exception 1205 a = self.new_task(loop, asyncio.sleep(10.0)) 1206 1207 async def exc(): 1208 raise ZeroDivisionError('err') 1209 1210 b = self.new_task(loop, exc()) 1211 task = self.new_task( 1212 loop, 1213 asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)) 1214 1215 done, pending = loop.run_until_complete(task) 1216 self.assertEqual({b}, done) 1217 self.assertEqual({a}, pending) 1218 self.assertAlmostEqual(0, loop.time()) 1219 1220 # move forward to close generator 1221 loop.advance_time(10) 1222 loop.run_until_complete(asyncio.wait([a, b])) 1223 1224 def test_wait_first_exception_in_wait(self): 1225 1226 def gen(): 1227 when = yield 1228 self.assertAlmostEqual(10.0, when) 1229 when = yield 0 1230 self.assertAlmostEqual(0.01, when) 1231 yield 0.01 1232 1233 loop = self.new_test_loop(gen) 1234 1235 # first_exception, exception during waiting 1236 a = self.new_task(loop, asyncio.sleep(10.0)) 1237 1238 async def exc(): 1239 await asyncio.sleep(0.01) 1240 raise ZeroDivisionError('err') 1241 1242 b = self.new_task(loop, exc()) 1243 task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION) 1244 1245 done, pending = loop.run_until_complete(task) 1246 self.assertEqual({b}, done) 1247 self.assertEqual({a}, pending) 1248 self.assertAlmostEqual(0.01, loop.time()) 1249 1250 # move forward to close generator 1251 loop.advance_time(10) 1252 loop.run_until_complete(asyncio.wait([a, b])) 1253 1254 def test_wait_with_exception(self): 1255 1256 def gen(): 1257 when = yield 1258 self.assertAlmostEqual(0.1, when) 1259 when = yield 0 1260 self.assertAlmostEqual(0.15, when) 1261 yield 0.15 1262 1263 loop = self.new_test_loop(gen) 1264 1265 a = self.new_task(loop, asyncio.sleep(0.1)) 1266 1267 async def sleeper(): 1268 await asyncio.sleep(0.15) 1269 raise ZeroDivisionError('really') 1270 1271 b = self.new_task(loop, sleeper()) 1272 1273 async def foo(): 1274 done, pending = await asyncio.wait([b, a]) 1275 self.assertEqual(len(done), 2) 1276 self.assertEqual(pending, set()) 1277 errors = set(f for f in done if f.exception() is not None) 1278 self.assertEqual(len(errors), 1) 1279 1280 loop.run_until_complete(self.new_task(loop, foo())) 1281 self.assertAlmostEqual(0.15, loop.time()) 1282 1283 loop.run_until_complete(self.new_task(loop, foo())) 1284 self.assertAlmostEqual(0.15, loop.time()) 1285 1286 def test_wait_with_timeout(self): 1287 1288 def gen(): 1289 when = yield 1290 self.assertAlmostEqual(0.1, when) 1291 when = yield 0 1292 self.assertAlmostEqual(0.15, when) 1293 when = yield 0 1294 self.assertAlmostEqual(0.11, when) 1295 yield 0.11 1296 1297 loop = self.new_test_loop(gen) 1298 1299 a = self.new_task(loop, asyncio.sleep(0.1)) 1300 b = self.new_task(loop, asyncio.sleep(0.15)) 1301 1302 async def foo(): 1303 done, pending = await asyncio.wait([b, a], timeout=0.11) 1304 self.assertEqual(done, set([a])) 1305 self.assertEqual(pending, set([b])) 1306 1307 loop.run_until_complete(self.new_task(loop, foo())) 1308 self.assertAlmostEqual(0.11, loop.time()) 1309 1310 # move forward to close generator 1311 loop.advance_time(10) 1312 loop.run_until_complete(asyncio.wait([a, b])) 1313 1314 def test_wait_concurrent_complete(self): 1315 1316 def gen(): 1317 when = yield 1318 self.assertAlmostEqual(0.1, when) 1319 when = yield 0 1320 self.assertAlmostEqual(0.15, when) 1321 when = yield 0 1322 self.assertAlmostEqual(0.1, when) 1323 yield 0.1 1324 1325 loop = self.new_test_loop(gen) 1326 1327 a = self.new_task(loop, asyncio.sleep(0.1)) 1328 b = self.new_task(loop, asyncio.sleep(0.15)) 1329 1330 done, pending = loop.run_until_complete( 1331 asyncio.wait([b, a], timeout=0.1)) 1332 1333 self.assertEqual(done, set([a])) 1334 self.assertEqual(pending, set([b])) 1335 self.assertAlmostEqual(0.1, loop.time()) 1336 1337 # move forward to close generator 1338 loop.advance_time(10) 1339 loop.run_until_complete(asyncio.wait([a, b])) 1340 1341 def test_wait_with_iterator_of_tasks(self): 1342 1343 def gen(): 1344 when = yield 1345 self.assertAlmostEqual(0.1, when) 1346 when = yield 0 1347 self.assertAlmostEqual(0.15, when) 1348 yield 0.15 1349 1350 loop = self.new_test_loop(gen) 1351 1352 a = self.new_task(loop, asyncio.sleep(0.1)) 1353 b = self.new_task(loop, asyncio.sleep(0.15)) 1354 1355 async def foo(): 1356 done, pending = await asyncio.wait(iter([b, a])) 1357 self.assertEqual(done, set([a, b])) 1358 self.assertEqual(pending, set()) 1359 return 42 1360 1361 res = loop.run_until_complete(self.new_task(loop, foo())) 1362 self.assertEqual(res, 42) 1363 self.assertAlmostEqual(0.15, loop.time()) 1364 1365 def test_as_completed(self): 1366 1367 def gen(): 1368 yield 0 1369 yield 0 1370 yield 0.01 1371 yield 0 1372 1373 loop = self.new_test_loop(gen) 1374 # disable "slow callback" warning 1375 loop.slow_callback_duration = 1.0 1376 completed = set() 1377 time_shifted = False 1378 1379 async def sleeper(dt, x): 1380 nonlocal time_shifted 1381 await asyncio.sleep(dt) 1382 completed.add(x) 1383 if not time_shifted and 'a' in completed and 'b' in completed: 1384 time_shifted = True 1385 loop.advance_time(0.14) 1386 return x 1387 1388 a = sleeper(0.01, 'a') 1389 b = sleeper(0.01, 'b') 1390 c = sleeper(0.15, 'c') 1391 1392 async def foo(): 1393 values = [] 1394 for f in asyncio.as_completed([b, c, a]): 1395 values.append(await f) 1396 return values 1397 1398 res = loop.run_until_complete(self.new_task(loop, foo())) 1399 self.assertAlmostEqual(0.15, loop.time()) 1400 self.assertTrue('a' in res[:2]) 1401 self.assertTrue('b' in res[:2]) 1402 self.assertEqual(res[2], 'c') 1403 1404 def test_as_completed_with_timeout(self): 1405 1406 def gen(): 1407 yield 1408 yield 0 1409 yield 0 1410 yield 0.1 1411 1412 loop = self.new_test_loop(gen) 1413 1414 a = loop.create_task(asyncio.sleep(0.1, 'a')) 1415 b = loop.create_task(asyncio.sleep(0.15, 'b')) 1416 1417 async def foo(): 1418 values = [] 1419 for f in asyncio.as_completed([a, b], timeout=0.12): 1420 if values: 1421 loop.advance_time(0.02) 1422 try: 1423 v = await f 1424 values.append((1, v)) 1425 except asyncio.TimeoutError as exc: 1426 values.append((2, exc)) 1427 return values 1428 1429 res = loop.run_until_complete(self.new_task(loop, foo())) 1430 self.assertEqual(len(res), 2, res) 1431 self.assertEqual(res[0], (1, 'a')) 1432 self.assertEqual(res[1][0], 2) 1433 self.assertIsInstance(res[1][1], asyncio.TimeoutError) 1434 self.assertAlmostEqual(0.12, loop.time()) 1435 1436 # move forward to close generator 1437 loop.advance_time(10) 1438 loop.run_until_complete(asyncio.wait([a, b])) 1439 1440 def test_as_completed_with_unused_timeout(self): 1441 1442 def gen(): 1443 yield 1444 yield 0 1445 yield 0.01 1446 1447 loop = self.new_test_loop(gen) 1448 1449 a = asyncio.sleep(0.01, 'a') 1450 1451 async def foo(): 1452 for f in asyncio.as_completed([a], timeout=1): 1453 v = await f 1454 self.assertEqual(v, 'a') 1455 1456 loop.run_until_complete(self.new_task(loop, foo())) 1457 1458 def test_as_completed_reverse_wait(self): 1459 1460 def gen(): 1461 yield 0 1462 yield 0.05 1463 yield 0 1464 1465 loop = self.new_test_loop(gen) 1466 1467 a = asyncio.sleep(0.05, 'a') 1468 b = asyncio.sleep(0.10, 'b') 1469 fs = {a, b} 1470 1471 async def test(): 1472 futs = list(asyncio.as_completed(fs)) 1473 self.assertEqual(len(futs), 2) 1474 1475 x = await futs[1] 1476 self.assertEqual(x, 'a') 1477 self.assertAlmostEqual(0.05, loop.time()) 1478 loop.advance_time(0.05) 1479 y = await futs[0] 1480 self.assertEqual(y, 'b') 1481 self.assertAlmostEqual(0.10, loop.time()) 1482 1483 loop.run_until_complete(test()) 1484 1485 def test_as_completed_concurrent(self): 1486 1487 def gen(): 1488 when = yield 1489 self.assertAlmostEqual(0.05, when) 1490 when = yield 0 1491 self.assertAlmostEqual(0.05, when) 1492 yield 0.05 1493 1494 a = asyncio.sleep(0.05, 'a') 1495 b = asyncio.sleep(0.05, 'b') 1496 fs = {a, b} 1497 1498 async def test(): 1499 futs = list(asyncio.as_completed(fs)) 1500 self.assertEqual(len(futs), 2) 1501 done, pending = await asyncio.wait( 1502 [asyncio.ensure_future(fut) for fut in futs] 1503 ) 1504 self.assertEqual(set(f.result() for f in done), {'a', 'b'}) 1505 1506 loop = self.new_test_loop(gen) 1507 loop.run_until_complete(test()) 1508 1509 def test_as_completed_duplicate_coroutines(self): 1510 1511 async def coro(s): 1512 return s 1513 1514 async def runner(): 1515 result = [] 1516 c = coro('ham') 1517 for f in asyncio.as_completed([c, c, coro('spam')]): 1518 result.append(await f) 1519 return result 1520 1521 fut = self.new_task(self.loop, runner()) 1522 self.loop.run_until_complete(fut) 1523 result = fut.result() 1524 self.assertEqual(set(result), {'ham', 'spam'}) 1525 self.assertEqual(len(result), 2) 1526 1527 def test_as_completed_coroutine_without_loop(self): 1528 async def coro(): 1529 return 42 1530 1531 a = coro() 1532 self.addCleanup(a.close) 1533 1534 futs = asyncio.as_completed([a]) 1535 with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 1536 list(futs) 1537 1538 def test_as_completed_coroutine_use_running_loop(self): 1539 loop = self.new_test_loop() 1540 1541 async def coro(): 1542 return 42 1543 1544 async def test(): 1545 futs = list(asyncio.as_completed([coro()])) 1546 self.assertEqual(len(futs), 1) 1547 self.assertEqual(await futs[0], 42) 1548 1549 loop.run_until_complete(test()) 1550 1551 def test_sleep(self): 1552 1553 def gen(): 1554 when = yield 1555 self.assertAlmostEqual(0.05, when) 1556 when = yield 0.05 1557 self.assertAlmostEqual(0.1, when) 1558 yield 0.05 1559 1560 loop = self.new_test_loop(gen) 1561 1562 async def sleeper(dt, arg): 1563 await asyncio.sleep(dt/2) 1564 res = await asyncio.sleep(dt/2, arg) 1565 return res 1566 1567 t = self.new_task(loop, sleeper(0.1, 'yeah')) 1568 loop.run_until_complete(t) 1569 self.assertTrue(t.done()) 1570 self.assertEqual(t.result(), 'yeah') 1571 self.assertAlmostEqual(0.1, loop.time()) 1572 1573 def test_sleep_cancel(self): 1574 1575 def gen(): 1576 when = yield 1577 self.assertAlmostEqual(10.0, when) 1578 yield 0 1579 1580 loop = self.new_test_loop(gen) 1581 1582 t = self.new_task(loop, asyncio.sleep(10.0, 'yeah')) 1583 1584 handle = None 1585 orig_call_later = loop.call_later 1586 1587 def call_later(delay, callback, *args): 1588 nonlocal handle 1589 handle = orig_call_later(delay, callback, *args) 1590 return handle 1591 1592 loop.call_later = call_later 1593 test_utils.run_briefly(loop) 1594 1595 self.assertFalse(handle._cancelled) 1596 1597 t.cancel() 1598 test_utils.run_briefly(loop) 1599 self.assertTrue(handle._cancelled) 1600 1601 def test_task_cancel_sleeping_task(self): 1602 1603 def gen(): 1604 when = yield 1605 self.assertAlmostEqual(0.1, when) 1606 when = yield 0 1607 self.assertAlmostEqual(5000, when) 1608 yield 0.1 1609 1610 loop = self.new_test_loop(gen) 1611 1612 async def sleep(dt): 1613 await asyncio.sleep(dt) 1614 1615 async def doit(): 1616 sleeper = self.new_task(loop, sleep(5000)) 1617 loop.call_later(0.1, sleeper.cancel) 1618 try: 1619 await sleeper 1620 except asyncio.CancelledError: 1621 return 'cancelled' 1622 else: 1623 return 'slept in' 1624 1625 doer = doit() 1626 self.assertEqual(loop.run_until_complete(doer), 'cancelled') 1627 self.assertAlmostEqual(0.1, loop.time()) 1628 1629 def test_task_cancel_waiter_future(self): 1630 fut = self.new_future(self.loop) 1631 1632 async def coro(): 1633 await fut 1634 1635 task = self.new_task(self.loop, coro()) 1636 test_utils.run_briefly(self.loop) 1637 self.assertIs(task._fut_waiter, fut) 1638 1639 task.cancel() 1640 test_utils.run_briefly(self.loop) 1641 self.assertRaises( 1642 asyncio.CancelledError, self.loop.run_until_complete, task) 1643 self.assertIsNone(task._fut_waiter) 1644 self.assertTrue(fut.cancelled()) 1645 1646 def test_task_set_methods(self): 1647 async def notmuch(): 1648 return 'ko' 1649 1650 gen = notmuch() 1651 task = self.new_task(self.loop, gen) 1652 1653 with self.assertRaisesRegex(RuntimeError, 'not support set_result'): 1654 task.set_result('ok') 1655 1656 with self.assertRaisesRegex(RuntimeError, 'not support set_exception'): 1657 task.set_exception(ValueError()) 1658 1659 self.assertEqual( 1660 self.loop.run_until_complete(task), 1661 'ko') 1662 1663 def test_step_result_future(self): 1664 # If coroutine returns future, task waits on this future. 1665 1666 class Fut(asyncio.Future): 1667 def __init__(self, *args, **kwds): 1668 self.cb_added = False 1669 super().__init__(*args, **kwds) 1670 1671 def add_done_callback(self, *args, **kwargs): 1672 self.cb_added = True 1673 super().add_done_callback(*args, **kwargs) 1674 1675 fut = Fut(loop=self.loop) 1676 result = None 1677 1678 async def wait_for_future(): 1679 nonlocal result 1680 result = await fut 1681 1682 t = self.new_task(self.loop, wait_for_future()) 1683 test_utils.run_briefly(self.loop) 1684 self.assertTrue(fut.cb_added) 1685 1686 res = object() 1687 fut.set_result(res) 1688 test_utils.run_briefly(self.loop) 1689 self.assertIs(res, result) 1690 self.assertTrue(t.done()) 1691 self.assertIsNone(t.result()) 1692 1693 def test_baseexception_during_cancel(self): 1694 1695 def gen(): 1696 when = yield 1697 self.assertAlmostEqual(10.0, when) 1698 yield 0 1699 1700 loop = self.new_test_loop(gen) 1701 1702 async def sleeper(): 1703 await asyncio.sleep(10) 1704 1705 base_exc = SystemExit() 1706 1707 async def notmutch(): 1708 try: 1709 await sleeper() 1710 except asyncio.CancelledError: 1711 raise base_exc 1712 1713 task = self.new_task(loop, notmutch()) 1714 test_utils.run_briefly(loop) 1715 1716 task.cancel() 1717 self.assertFalse(task.done()) 1718 1719 self.assertRaises(SystemExit, test_utils.run_briefly, loop) 1720 1721 self.assertTrue(task.done()) 1722 self.assertFalse(task.cancelled()) 1723 self.assertIs(task.exception(), base_exc) 1724 1725 def test_iscoroutinefunction(self): 1726 def fn(): 1727 pass 1728 1729 self.assertFalse(asyncio.iscoroutinefunction(fn)) 1730 1731 def fn1(): 1732 yield 1733 self.assertFalse(asyncio.iscoroutinefunction(fn1)) 1734 1735 async def fn2(): 1736 pass 1737 self.assertTrue(asyncio.iscoroutinefunction(fn2)) 1738 1739 self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) 1740 self.assertTrue(asyncio.iscoroutinefunction(mock.AsyncMock())) 1741 1742 def test_coroutine_non_gen_function(self): 1743 async def func(): 1744 return 'test' 1745 1746 self.assertTrue(asyncio.iscoroutinefunction(func)) 1747 1748 coro = func() 1749 self.assertTrue(asyncio.iscoroutine(coro)) 1750 1751 res = self.loop.run_until_complete(coro) 1752 self.assertEqual(res, 'test') 1753 1754 def test_coroutine_non_gen_function_return_future(self): 1755 fut = self.new_future(self.loop) 1756 1757 async def func(): 1758 return fut 1759 1760 async def coro(): 1761 fut.set_result('test') 1762 1763 t1 = self.new_task(self.loop, func()) 1764 t2 = self.new_task(self.loop, coro()) 1765 res = self.loop.run_until_complete(t1) 1766 self.assertEqual(res, fut) 1767 self.assertIsNone(t2.result()) 1768 1769 def test_current_task(self): 1770 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1771 1772 async def coro(loop): 1773 self.assertIs(asyncio.current_task(), task) 1774 1775 self.assertIs(asyncio.current_task(None), task) 1776 self.assertIs(asyncio.current_task(), task) 1777 1778 task = self.new_task(self.loop, coro(self.loop)) 1779 self.loop.run_until_complete(task) 1780 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1781 1782 def test_current_task_with_interleaving_tasks(self): 1783 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1784 1785 fut1 = self.new_future(self.loop) 1786 fut2 = self.new_future(self.loop) 1787 1788 async def coro1(loop): 1789 self.assertTrue(asyncio.current_task() is task1) 1790 await fut1 1791 self.assertTrue(asyncio.current_task() is task1) 1792 fut2.set_result(True) 1793 1794 async def coro2(loop): 1795 self.assertTrue(asyncio.current_task() is task2) 1796 fut1.set_result(True) 1797 await fut2 1798 self.assertTrue(asyncio.current_task() is task2) 1799 1800 task1 = self.new_task(self.loop, coro1(self.loop)) 1801 task2 = self.new_task(self.loop, coro2(self.loop)) 1802 1803 self.loop.run_until_complete(asyncio.wait((task1, task2))) 1804 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1805 1806 # Some thorough tests for cancellation propagation through 1807 # coroutines, tasks and wait(). 1808 1809 def test_yield_future_passes_cancel(self): 1810 # Cancelling outer() cancels inner() cancels waiter. 1811 proof = 0 1812 waiter = self.new_future(self.loop) 1813 1814 async def inner(): 1815 nonlocal proof 1816 try: 1817 await waiter 1818 except asyncio.CancelledError: 1819 proof += 1 1820 raise 1821 else: 1822 self.fail('got past sleep() in inner()') 1823 1824 async def outer(): 1825 nonlocal proof 1826 try: 1827 await inner() 1828 except asyncio.CancelledError: 1829 proof += 100 # Expect this path. 1830 else: 1831 proof += 10 1832 1833 f = asyncio.ensure_future(outer(), loop=self.loop) 1834 test_utils.run_briefly(self.loop) 1835 f.cancel() 1836 self.loop.run_until_complete(f) 1837 self.assertEqual(proof, 101) 1838 self.assertTrue(waiter.cancelled()) 1839 1840 def test_yield_wait_does_not_shield_cancel(self): 1841 # Cancelling outer() makes wait() return early, leaves inner() 1842 # running. 1843 proof = 0 1844 waiter = self.new_future(self.loop) 1845 1846 async def inner(): 1847 nonlocal proof 1848 await waiter 1849 proof += 1 1850 1851 async def outer(): 1852 nonlocal proof 1853 with self.assertWarns(DeprecationWarning): 1854 d, p = await asyncio.wait([asyncio.create_task(inner())]) 1855 proof += 100 1856 1857 f = asyncio.ensure_future(outer(), loop=self.loop) 1858 test_utils.run_briefly(self.loop) 1859 f.cancel() 1860 self.assertRaises( 1861 asyncio.CancelledError, self.loop.run_until_complete, f) 1862 waiter.set_result(None) 1863 test_utils.run_briefly(self.loop) 1864 self.assertEqual(proof, 1) 1865 1866 def test_shield_result(self): 1867 inner = self.new_future(self.loop) 1868 outer = asyncio.shield(inner) 1869 inner.set_result(42) 1870 res = self.loop.run_until_complete(outer) 1871 self.assertEqual(res, 42) 1872 1873 def test_shield_exception(self): 1874 inner = self.new_future(self.loop) 1875 outer = asyncio.shield(inner) 1876 test_utils.run_briefly(self.loop) 1877 exc = RuntimeError('expected') 1878 inner.set_exception(exc) 1879 test_utils.run_briefly(self.loop) 1880 self.assertIs(outer.exception(), exc) 1881 1882 def test_shield_cancel_inner(self): 1883 inner = self.new_future(self.loop) 1884 outer = asyncio.shield(inner) 1885 test_utils.run_briefly(self.loop) 1886 inner.cancel() 1887 test_utils.run_briefly(self.loop) 1888 self.assertTrue(outer.cancelled()) 1889 1890 def test_shield_cancel_outer(self): 1891 inner = self.new_future(self.loop) 1892 outer = asyncio.shield(inner) 1893 test_utils.run_briefly(self.loop) 1894 outer.cancel() 1895 test_utils.run_briefly(self.loop) 1896 self.assertTrue(outer.cancelled()) 1897 self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks)) 1898 1899 def test_shield_shortcut(self): 1900 fut = self.new_future(self.loop) 1901 fut.set_result(42) 1902 res = self.loop.run_until_complete(asyncio.shield(fut)) 1903 self.assertEqual(res, 42) 1904 1905 def test_shield_effect(self): 1906 # Cancelling outer() does not affect inner(). 1907 proof = 0 1908 waiter = self.new_future(self.loop) 1909 1910 async def inner(): 1911 nonlocal proof 1912 await waiter 1913 proof += 1 1914 1915 async def outer(): 1916 nonlocal proof 1917 await asyncio.shield(inner()) 1918 proof += 100 1919 1920 f = asyncio.ensure_future(outer(), loop=self.loop) 1921 test_utils.run_briefly(self.loop) 1922 f.cancel() 1923 with self.assertRaises(asyncio.CancelledError): 1924 self.loop.run_until_complete(f) 1925 waiter.set_result(None) 1926 test_utils.run_briefly(self.loop) 1927 self.assertEqual(proof, 1) 1928 1929 def test_shield_gather(self): 1930 child1 = self.new_future(self.loop) 1931 child2 = self.new_future(self.loop) 1932 parent = asyncio.gather(child1, child2) 1933 outer = asyncio.shield(parent) 1934 test_utils.run_briefly(self.loop) 1935 outer.cancel() 1936 test_utils.run_briefly(self.loop) 1937 self.assertTrue(outer.cancelled()) 1938 child1.set_result(1) 1939 child2.set_result(2) 1940 test_utils.run_briefly(self.loop) 1941 self.assertEqual(parent.result(), [1, 2]) 1942 1943 def test_gather_shield(self): 1944 child1 = self.new_future(self.loop) 1945 child2 = self.new_future(self.loop) 1946 inner1 = asyncio.shield(child1) 1947 inner2 = asyncio.shield(child2) 1948 parent = asyncio.gather(inner1, inner2) 1949 test_utils.run_briefly(self.loop) 1950 parent.cancel() 1951 # This should cancel inner1 and inner2 but bot child1 and child2. 1952 test_utils.run_briefly(self.loop) 1953 self.assertIsInstance(parent.exception(), asyncio.CancelledError) 1954 self.assertTrue(inner1.cancelled()) 1955 self.assertTrue(inner2.cancelled()) 1956 child1.set_result(1) 1957 child2.set_result(2) 1958 test_utils.run_briefly(self.loop) 1959 1960 def test_shield_coroutine_without_loop(self): 1961 async def coro(): 1962 return 42 1963 1964 inner = coro() 1965 self.addCleanup(inner.close) 1966 with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 1967 asyncio.shield(inner) 1968 1969 def test_shield_coroutine_use_running_loop(self): 1970 async def coro(): 1971 return 42 1972 1973 async def test(): 1974 return asyncio.shield(coro()) 1975 outer = self.loop.run_until_complete(test()) 1976 self.assertEqual(outer._loop, self.loop) 1977 res = self.loop.run_until_complete(outer) 1978 self.assertEqual(res, 42) 1979 1980 def test_shield_coroutine_use_global_loop(self): 1981 # Deprecated in 3.10, undeprecated in 3.11.1 1982 async def coro(): 1983 return 42 1984 1985 asyncio.set_event_loop(self.loop) 1986 self.addCleanup(asyncio.set_event_loop, None) 1987 outer = asyncio.shield(coro()) 1988 self.assertEqual(outer._loop, self.loop) 1989 res = self.loop.run_until_complete(outer) 1990 self.assertEqual(res, 42) 1991 1992 def test_as_completed_invalid_args(self): 1993 fut = self.new_future(self.loop) 1994 1995 # as_completed() expects a list of futures, not a future instance 1996 self.assertRaises(TypeError, self.loop.run_until_complete, 1997 asyncio.as_completed(fut)) 1998 coro = coroutine_function() 1999 self.assertRaises(TypeError, self.loop.run_until_complete, 2000 asyncio.as_completed(coro)) 2001 coro.close() 2002 2003 def test_wait_invalid_args(self): 2004 fut = self.new_future(self.loop) 2005 2006 # wait() expects a list of futures, not a future instance 2007 self.assertRaises(TypeError, self.loop.run_until_complete, 2008 asyncio.wait(fut)) 2009 coro = coroutine_function() 2010 self.assertRaises(TypeError, self.loop.run_until_complete, 2011 asyncio.wait(coro)) 2012 coro.close() 2013 2014 # wait() expects at least a future 2015 self.assertRaises(ValueError, self.loop.run_until_complete, 2016 asyncio.wait([])) 2017 2018 def test_log_destroyed_pending_task(self): 2019 Task = self.__class__.Task 2020 2021 async def kill_me(loop): 2022 future = self.new_future(loop) 2023 await future 2024 # at this point, the only reference to kill_me() task is 2025 # the Task._wakeup() method in future._callbacks 2026 raise Exception("code never reached") 2027 2028 mock_handler = mock.Mock() 2029 self.loop.set_debug(True) 2030 self.loop.set_exception_handler(mock_handler) 2031 2032 # schedule the task 2033 coro = kill_me(self.loop) 2034 task = asyncio.ensure_future(coro, loop=self.loop) 2035 2036 self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) 2037 2038 asyncio.set_event_loop(None) 2039 2040 # execute the task so it waits for future 2041 self.loop._run_once() 2042 self.assertEqual(len(self.loop._ready), 0) 2043 2044 coro = None 2045 source_traceback = task._source_traceback 2046 task = None 2047 2048 # no more reference to kill_me() task: the task is destroyed by the GC 2049 support.gc_collect() 2050 2051 self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) 2052 2053 mock_handler.assert_called_with(self.loop, { 2054 'message': 'Task was destroyed but it is pending!', 2055 'task': mock.ANY, 2056 'source_traceback': source_traceback, 2057 }) 2058 mock_handler.reset_mock() 2059 2060 @mock.patch('asyncio.base_events.logger') 2061 def test_tb_logger_not_called_after_cancel(self, m_log): 2062 loop = asyncio.new_event_loop() 2063 self.set_event_loop(loop) 2064 2065 async def coro(): 2066 raise TypeError 2067 2068 async def runner(): 2069 task = self.new_task(loop, coro()) 2070 await asyncio.sleep(0.05) 2071 task.cancel() 2072 task = None 2073 2074 loop.run_until_complete(runner()) 2075 self.assertFalse(m_log.error.called) 2076 2077 def test_task_source_traceback(self): 2078 self.loop.set_debug(True) 2079 2080 task = self.new_task(self.loop, coroutine_function()) 2081 lineno = sys._getframe().f_lineno - 1 2082 self.assertIsInstance(task._source_traceback, list) 2083 self.assertEqual(task._source_traceback[-2][:3], 2084 (__file__, 2085 lineno, 2086 'test_task_source_traceback')) 2087 self.loop.run_until_complete(task) 2088 2089 def test_cancel_gather_1(self): 2090 """Ensure that a gathering future refuses to be cancelled once all 2091 children are done""" 2092 loop = asyncio.new_event_loop() 2093 self.addCleanup(loop.close) 2094 2095 fut = self.new_future(loop) 2096 async def create(): 2097 # The indirection fut->child_coro is needed since otherwise the 2098 # gathering task is done at the same time as the child future 2099 def child_coro(): 2100 return (yield from fut) 2101 gather_future = asyncio.gather(child_coro()) 2102 return asyncio.ensure_future(gather_future) 2103 gather_task = loop.run_until_complete(create()) 2104 2105 cancel_result = None 2106 def cancelling_callback(_): 2107 nonlocal cancel_result 2108 cancel_result = gather_task.cancel() 2109 fut.add_done_callback(cancelling_callback) 2110 2111 fut.set_result(42) # calls the cancelling_callback after fut is done() 2112 2113 # At this point the task should complete. 2114 loop.run_until_complete(gather_task) 2115 2116 # Python issue #26923: asyncio.gather drops cancellation 2117 self.assertEqual(cancel_result, False) 2118 self.assertFalse(gather_task.cancelled()) 2119 self.assertEqual(gather_task.result(), [42]) 2120 2121 def test_cancel_gather_2(self): 2122 cases = [ 2123 ((), ()), 2124 ((None,), ()), 2125 (('my message',), ('my message',)), 2126 # Non-string values should roundtrip. 2127 ((5,), (5,)), 2128 ] 2129 for cancel_args, expected_args in cases: 2130 with self.subTest(cancel_args=cancel_args): 2131 loop = asyncio.new_event_loop() 2132 self.addCleanup(loop.close) 2133 2134 async def test(): 2135 time = 0 2136 while True: 2137 time += 0.05 2138 await asyncio.gather(asyncio.sleep(0.05), 2139 return_exceptions=True) 2140 if time > 1: 2141 return 2142 2143 async def main(): 2144 qwe = self.new_task(loop, test()) 2145 await asyncio.sleep(0.2) 2146 qwe.cancel(*cancel_args) 2147 await qwe 2148 2149 try: 2150 loop.run_until_complete(main()) 2151 except asyncio.CancelledError as exc: 2152 self.assertEqual(exc.args, expected_args) 2153 actual = get_innermost_context(exc) 2154 self.assertEqual( 2155 actual, 2156 (asyncio.CancelledError, expected_args, 0), 2157 ) 2158 else: 2159 self.fail( 2160 'gather() does not propagate CancelledError ' 2161 'raised by inner task to the gather() caller.' 2162 ) 2163 2164 def test_exception_traceback(self): 2165 # See http://bugs.python.org/issue28843 2166 2167 async def foo(): 2168 1 / 0 2169 2170 async def main(): 2171 task = self.new_task(self.loop, foo()) 2172 await asyncio.sleep(0) # skip one loop iteration 2173 self.assertIsNotNone(task.exception().__traceback__) 2174 2175 self.loop.run_until_complete(main()) 2176 2177 @mock.patch('asyncio.base_events.logger') 2178 def test_error_in_call_soon(self, m_log): 2179 def call_soon(callback, *args, **kwargs): 2180 raise ValueError 2181 self.loop.call_soon = call_soon 2182 2183 async def coro(): 2184 pass 2185 2186 self.assertFalse(m_log.error.called) 2187 2188 with self.assertRaises(ValueError): 2189 gen = coro() 2190 try: 2191 self.new_task(self.loop, gen) 2192 finally: 2193 gen.close() 2194 gc.collect() # For PyPy or other GCs. 2195 2196 self.assertTrue(m_log.error.called) 2197 message = m_log.error.call_args[0][0] 2198 self.assertIn('Task was destroyed but it is pending', message) 2199 2200 self.assertEqual(asyncio.all_tasks(self.loop), set()) 2201 2202 def test_create_task_with_noncoroutine(self): 2203 with self.assertRaisesRegex(TypeError, 2204 "a coroutine was expected, got 123"): 2205 self.new_task(self.loop, 123) 2206 2207 # test it for the second time to ensure that caching 2208 # in asyncio.iscoroutine() doesn't break things. 2209 with self.assertRaisesRegex(TypeError, 2210 "a coroutine was expected, got 123"): 2211 self.new_task(self.loop, 123) 2212 2213 def test_create_task_with_async_function(self): 2214 2215 async def coro(): 2216 pass 2217 2218 task = self.new_task(self.loop, coro()) 2219 self.assertIsInstance(task, self.Task) 2220 self.loop.run_until_complete(task) 2221 2222 # test it for the second time to ensure that caching 2223 # in asyncio.iscoroutine() doesn't break things. 2224 task = self.new_task(self.loop, coro()) 2225 self.assertIsInstance(task, self.Task) 2226 self.loop.run_until_complete(task) 2227 2228 def test_create_task_with_asynclike_function(self): 2229 task = self.new_task(self.loop, CoroLikeObject()) 2230 self.assertIsInstance(task, self.Task) 2231 self.assertEqual(self.loop.run_until_complete(task), 42) 2232 2233 # test it for the second time to ensure that caching 2234 # in asyncio.iscoroutine() doesn't break things. 2235 task = self.new_task(self.loop, CoroLikeObject()) 2236 self.assertIsInstance(task, self.Task) 2237 self.assertEqual(self.loop.run_until_complete(task), 42) 2238 2239 def test_bare_create_task(self): 2240 2241 async def inner(): 2242 return 1 2243 2244 async def coro(): 2245 task = asyncio.create_task(inner()) 2246 self.assertIsInstance(task, self.Task) 2247 ret = await task 2248 self.assertEqual(1, ret) 2249 2250 self.loop.run_until_complete(coro()) 2251 2252 def test_bare_create_named_task(self): 2253 2254 async def coro_noop(): 2255 pass 2256 2257 async def coro(): 2258 task = asyncio.create_task(coro_noop(), name='No-op') 2259 self.assertEqual(task.get_name(), 'No-op') 2260 await task 2261 2262 self.loop.run_until_complete(coro()) 2263 2264 def test_context_1(self): 2265 cvar = contextvars.ContextVar('cvar', default='nope') 2266 2267 async def sub(): 2268 await asyncio.sleep(0.01) 2269 self.assertEqual(cvar.get(), 'nope') 2270 cvar.set('something else') 2271 2272 async def main(): 2273 self.assertEqual(cvar.get(), 'nope') 2274 subtask = self.new_task(loop, sub()) 2275 cvar.set('yes') 2276 self.assertEqual(cvar.get(), 'yes') 2277 await subtask 2278 self.assertEqual(cvar.get(), 'yes') 2279 2280 loop = asyncio.new_event_loop() 2281 try: 2282 task = self.new_task(loop, main()) 2283 loop.run_until_complete(task) 2284 finally: 2285 loop.close() 2286 2287 def test_context_2(self): 2288 cvar = contextvars.ContextVar('cvar', default='nope') 2289 2290 async def main(): 2291 def fut_on_done(fut): 2292 # This change must not pollute the context 2293 # of the "main()" task. 2294 cvar.set('something else') 2295 2296 self.assertEqual(cvar.get(), 'nope') 2297 2298 for j in range(2): 2299 fut = self.new_future(loop) 2300 fut.add_done_callback(fut_on_done) 2301 cvar.set(f'yes{j}') 2302 loop.call_soon(fut.set_result, None) 2303 await fut 2304 self.assertEqual(cvar.get(), f'yes{j}') 2305 2306 for i in range(3): 2307 # Test that task passed its context to add_done_callback: 2308 cvar.set(f'yes{i}-{j}') 2309 await asyncio.sleep(0.001) 2310 self.assertEqual(cvar.get(), f'yes{i}-{j}') 2311 2312 loop = asyncio.new_event_loop() 2313 try: 2314 task = self.new_task(loop, main()) 2315 loop.run_until_complete(task) 2316 finally: 2317 loop.close() 2318 2319 self.assertEqual(cvar.get(), 'nope') 2320 2321 def test_context_3(self): 2322 # Run 100 Tasks in parallel, each modifying cvar. 2323 2324 cvar = contextvars.ContextVar('cvar', default=-1) 2325 2326 async def sub(num): 2327 for i in range(10): 2328 cvar.set(num + i) 2329 await asyncio.sleep(random.uniform(0.001, 0.05)) 2330 self.assertEqual(cvar.get(), num + i) 2331 2332 async def main(): 2333 tasks = [] 2334 for i in range(100): 2335 task = loop.create_task(sub(random.randint(0, 10))) 2336 tasks.append(task) 2337 2338 await asyncio.gather(*tasks) 2339 2340 loop = asyncio.new_event_loop() 2341 try: 2342 loop.run_until_complete(main()) 2343 finally: 2344 loop.close() 2345 2346 self.assertEqual(cvar.get(), -1) 2347 2348 def test_context_4(self): 2349 cvar = contextvars.ContextVar('cvar') 2350 2351 async def coro(val): 2352 await asyncio.sleep(0) 2353 cvar.set(val) 2354 2355 async def main(): 2356 ret = [] 2357 ctx = contextvars.copy_context() 2358 ret.append(ctx.get(cvar)) 2359 t1 = self.new_task(loop, coro(1), context=ctx) 2360 await t1 2361 ret.append(ctx.get(cvar)) 2362 t2 = self.new_task(loop, coro(2), context=ctx) 2363 await t2 2364 ret.append(ctx.get(cvar)) 2365 return ret 2366 2367 loop = asyncio.new_event_loop() 2368 try: 2369 task = self.new_task(loop, main()) 2370 ret = loop.run_until_complete(task) 2371 finally: 2372 loop.close() 2373 2374 self.assertEqual([None, 1, 2], ret) 2375 2376 def test_context_5(self): 2377 cvar = contextvars.ContextVar('cvar') 2378 2379 async def coro(val): 2380 await asyncio.sleep(0) 2381 cvar.set(val) 2382 2383 async def main(): 2384 ret = [] 2385 ctx = contextvars.copy_context() 2386 ret.append(ctx.get(cvar)) 2387 t1 = asyncio.create_task(coro(1), context=ctx) 2388 await t1 2389 ret.append(ctx.get(cvar)) 2390 t2 = asyncio.create_task(coro(2), context=ctx) 2391 await t2 2392 ret.append(ctx.get(cvar)) 2393 return ret 2394 2395 loop = asyncio.new_event_loop() 2396 try: 2397 task = self.new_task(loop, main()) 2398 ret = loop.run_until_complete(task) 2399 finally: 2400 loop.close() 2401 2402 self.assertEqual([None, 1, 2], ret) 2403 2404 def test_context_6(self): 2405 cvar = contextvars.ContextVar('cvar') 2406 2407 async def coro(val): 2408 await asyncio.sleep(0) 2409 cvar.set(val) 2410 2411 async def main(): 2412 ret = [] 2413 ctx = contextvars.copy_context() 2414 ret.append(ctx.get(cvar)) 2415 t1 = loop.create_task(coro(1), context=ctx) 2416 await t1 2417 ret.append(ctx.get(cvar)) 2418 t2 = loop.create_task(coro(2), context=ctx) 2419 await t2 2420 ret.append(ctx.get(cvar)) 2421 return ret 2422 2423 loop = asyncio.new_event_loop() 2424 try: 2425 task = loop.create_task(main()) 2426 ret = loop.run_until_complete(task) 2427 finally: 2428 loop.close() 2429 2430 self.assertEqual([None, 1, 2], ret) 2431 2432 def test_get_coro(self): 2433 loop = asyncio.new_event_loop() 2434 coro = coroutine_function() 2435 try: 2436 task = self.new_task(loop, coro) 2437 loop.run_until_complete(task) 2438 self.assertIs(task.get_coro(), coro) 2439 finally: 2440 loop.close() 2441 2442 2443def add_subclass_tests(cls): 2444 BaseTask = cls.Task 2445 BaseFuture = cls.Future 2446 2447 if BaseTask is None or BaseFuture is None: 2448 return cls 2449 2450 class CommonFuture: 2451 def __init__(self, *args, **kwargs): 2452 self.calls = collections.defaultdict(lambda: 0) 2453 super().__init__(*args, **kwargs) 2454 2455 def add_done_callback(self, *args, **kwargs): 2456 self.calls['add_done_callback'] += 1 2457 return super().add_done_callback(*args, **kwargs) 2458 2459 class Task(CommonFuture, BaseTask): 2460 pass 2461 2462 class Future(CommonFuture, BaseFuture): 2463 pass 2464 2465 def test_subclasses_ctask_cfuture(self): 2466 fut = self.Future(loop=self.loop) 2467 2468 async def func(): 2469 self.loop.call_soon(lambda: fut.set_result('spam')) 2470 return await fut 2471 2472 task = self.Task(func(), loop=self.loop) 2473 2474 result = self.loop.run_until_complete(task) 2475 2476 self.assertEqual(result, 'spam') 2477 2478 self.assertEqual( 2479 dict(task.calls), 2480 {'add_done_callback': 1}) 2481 2482 self.assertEqual( 2483 dict(fut.calls), 2484 {'add_done_callback': 1}) 2485 2486 # Add patched Task & Future back to the test case 2487 cls.Task = Task 2488 cls.Future = Future 2489 2490 # Add an extra unit-test 2491 cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture 2492 2493 # Disable the "test_task_source_traceback" test 2494 # (the test is hardcoded for a particular call stack, which 2495 # is slightly different for Task subclasses) 2496 cls.test_task_source_traceback = None 2497 2498 return cls 2499 2500 2501class SetMethodsTest: 2502 2503 def test_set_result_causes_invalid_state(self): 2504 Future = type(self).Future 2505 self.loop.call_exception_handler = exc_handler = mock.Mock() 2506 2507 async def foo(): 2508 await asyncio.sleep(0.1) 2509 return 10 2510 2511 coro = foo() 2512 task = self.new_task(self.loop, coro) 2513 Future.set_result(task, 'spam') 2514 2515 self.assertEqual( 2516 self.loop.run_until_complete(task), 2517 'spam') 2518 2519 exc_handler.assert_called_once() 2520 exc = exc_handler.call_args[0][0]['exception'] 2521 with self.assertRaisesRegex(asyncio.InvalidStateError, 2522 r'step\(\): already done'): 2523 raise exc 2524 2525 coro.close() 2526 2527 def test_set_exception_causes_invalid_state(self): 2528 class MyExc(Exception): 2529 pass 2530 2531 Future = type(self).Future 2532 self.loop.call_exception_handler = exc_handler = mock.Mock() 2533 2534 async def foo(): 2535 await asyncio.sleep(0.1) 2536 return 10 2537 2538 coro = foo() 2539 task = self.new_task(self.loop, coro) 2540 Future.set_exception(task, MyExc()) 2541 2542 with self.assertRaises(MyExc): 2543 self.loop.run_until_complete(task) 2544 2545 exc_handler.assert_called_once() 2546 exc = exc_handler.call_args[0][0]['exception'] 2547 with self.assertRaisesRegex(asyncio.InvalidStateError, 2548 r'step\(\): already done'): 2549 raise exc 2550 2551 coro.close() 2552 2553 2554@unittest.skipUnless(hasattr(futures, '_CFuture') and 2555 hasattr(tasks, '_CTask'), 2556 'requires the C _asyncio module') 2557class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, 2558 test_utils.TestCase): 2559 2560 Task = getattr(tasks, '_CTask', None) 2561 Future = getattr(futures, '_CFuture', None) 2562 2563 @support.refcount_test 2564 def test_refleaks_in_task___init__(self): 2565 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 2566 async def coro(): 2567 pass 2568 task = self.new_task(self.loop, coro()) 2569 self.loop.run_until_complete(task) 2570 refs_before = gettotalrefcount() 2571 for i in range(100): 2572 task.__init__(coro(), loop=self.loop) 2573 self.loop.run_until_complete(task) 2574 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 2575 2576 def test_del__log_destroy_pending_segfault(self): 2577 async def coro(): 2578 pass 2579 task = self.new_task(self.loop, coro()) 2580 self.loop.run_until_complete(task) 2581 with self.assertRaises(AttributeError): 2582 del task._log_destroy_pending 2583 2584 2585@unittest.skipUnless(hasattr(futures, '_CFuture') and 2586 hasattr(tasks, '_CTask'), 2587 'requires the C _asyncio module') 2588@add_subclass_tests 2589class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2590 2591 Task = getattr(tasks, '_CTask', None) 2592 Future = getattr(futures, '_CFuture', None) 2593 2594 2595@unittest.skipUnless(hasattr(tasks, '_CTask'), 2596 'requires the C _asyncio module') 2597@add_subclass_tests 2598class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2599 2600 Task = getattr(tasks, '_CTask', None) 2601 Future = futures._PyFuture 2602 2603 2604@unittest.skipUnless(hasattr(futures, '_CFuture'), 2605 'requires the C _asyncio module') 2606@add_subclass_tests 2607class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): 2608 2609 Future = getattr(futures, '_CFuture', None) 2610 Task = tasks._PyTask 2611 2612 2613@unittest.skipUnless(hasattr(tasks, '_CTask'), 2614 'requires the C _asyncio module') 2615class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2616 2617 Task = getattr(tasks, '_CTask', None) 2618 Future = futures._PyFuture 2619 2620 2621@unittest.skipUnless(hasattr(futures, '_CFuture'), 2622 'requires the C _asyncio module') 2623class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): 2624 2625 Task = tasks._PyTask 2626 Future = getattr(futures, '_CFuture', None) 2627 2628 2629class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, 2630 test_utils.TestCase): 2631 2632 Task = tasks._PyTask 2633 Future = futures._PyFuture 2634 2635 2636@add_subclass_tests 2637class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2638 Task = tasks._PyTask 2639 Future = futures._PyFuture 2640 2641 2642@unittest.skipUnless(hasattr(tasks, '_CTask'), 2643 'requires the C _asyncio module') 2644class CTask_Future_Tests(test_utils.TestCase): 2645 2646 def test_foobar(self): 2647 class Fut(asyncio.Future): 2648 @property 2649 def get_loop(self): 2650 raise AttributeError 2651 2652 async def coro(): 2653 await fut 2654 return 'spam' 2655 2656 self.loop = asyncio.new_event_loop() 2657 try: 2658 fut = Fut(loop=self.loop) 2659 self.loop.call_later(0.1, fut.set_result, 1) 2660 task = self.loop.create_task(coro()) 2661 res = self.loop.run_until_complete(task) 2662 finally: 2663 self.loop.close() 2664 2665 self.assertEqual(res, 'spam') 2666 2667 2668class BaseTaskIntrospectionTests: 2669 _register_task = None 2670 _unregister_task = None 2671 _enter_task = None 2672 _leave_task = None 2673 2674 def test__register_task_1(self): 2675 class TaskLike: 2676 @property 2677 def _loop(self): 2678 return loop 2679 2680 def done(self): 2681 return False 2682 2683 task = TaskLike() 2684 loop = mock.Mock() 2685 2686 self.assertEqual(asyncio.all_tasks(loop), set()) 2687 self._register_task(task) 2688 self.assertEqual(asyncio.all_tasks(loop), {task}) 2689 self._unregister_task(task) 2690 2691 def test__register_task_2(self): 2692 class TaskLike: 2693 def get_loop(self): 2694 return loop 2695 2696 def done(self): 2697 return False 2698 2699 task = TaskLike() 2700 loop = mock.Mock() 2701 2702 self.assertEqual(asyncio.all_tasks(loop), set()) 2703 self._register_task(task) 2704 self.assertEqual(asyncio.all_tasks(loop), {task}) 2705 self._unregister_task(task) 2706 2707 def test__register_task_3(self): 2708 class TaskLike: 2709 def get_loop(self): 2710 return loop 2711 2712 def done(self): 2713 return True 2714 2715 task = TaskLike() 2716 loop = mock.Mock() 2717 2718 self.assertEqual(asyncio.all_tasks(loop), set()) 2719 self._register_task(task) 2720 self.assertEqual(asyncio.all_tasks(loop), set()) 2721 self._unregister_task(task) 2722 2723 def test__enter_task(self): 2724 task = mock.Mock() 2725 loop = mock.Mock() 2726 self.assertIsNone(asyncio.current_task(loop)) 2727 self._enter_task(loop, task) 2728 self.assertIs(asyncio.current_task(loop), task) 2729 self._leave_task(loop, task) 2730 2731 def test__enter_task_failure(self): 2732 task1 = mock.Mock() 2733 task2 = mock.Mock() 2734 loop = mock.Mock() 2735 self._enter_task(loop, task1) 2736 with self.assertRaises(RuntimeError): 2737 self._enter_task(loop, task2) 2738 self.assertIs(asyncio.current_task(loop), task1) 2739 self._leave_task(loop, task1) 2740 2741 def test__leave_task(self): 2742 task = mock.Mock() 2743 loop = mock.Mock() 2744 self._enter_task(loop, task) 2745 self._leave_task(loop, task) 2746 self.assertIsNone(asyncio.current_task(loop)) 2747 2748 def test__leave_task_failure1(self): 2749 task1 = mock.Mock() 2750 task2 = mock.Mock() 2751 loop = mock.Mock() 2752 self._enter_task(loop, task1) 2753 with self.assertRaises(RuntimeError): 2754 self._leave_task(loop, task2) 2755 self.assertIs(asyncio.current_task(loop), task1) 2756 self._leave_task(loop, task1) 2757 2758 def test__leave_task_failure2(self): 2759 task = mock.Mock() 2760 loop = mock.Mock() 2761 with self.assertRaises(RuntimeError): 2762 self._leave_task(loop, task) 2763 self.assertIsNone(asyncio.current_task(loop)) 2764 2765 def test__unregister_task(self): 2766 task = mock.Mock() 2767 loop = mock.Mock() 2768 task.get_loop = lambda: loop 2769 self._register_task(task) 2770 self._unregister_task(task) 2771 self.assertEqual(asyncio.all_tasks(loop), set()) 2772 2773 def test__unregister_task_not_registered(self): 2774 task = mock.Mock() 2775 loop = mock.Mock() 2776 self._unregister_task(task) 2777 self.assertEqual(asyncio.all_tasks(loop), set()) 2778 2779 2780class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 2781 _register_task = staticmethod(tasks._py_register_task) 2782 _unregister_task = staticmethod(tasks._py_unregister_task) 2783 _enter_task = staticmethod(tasks._py_enter_task) 2784 _leave_task = staticmethod(tasks._py_leave_task) 2785 2786 2787@unittest.skipUnless(hasattr(tasks, '_c_register_task'), 2788 'requires the C _asyncio module') 2789class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 2790 if hasattr(tasks, '_c_register_task'): 2791 _register_task = staticmethod(tasks._c_register_task) 2792 _unregister_task = staticmethod(tasks._c_unregister_task) 2793 _enter_task = staticmethod(tasks._c_enter_task) 2794 _leave_task = staticmethod(tasks._c_leave_task) 2795 else: 2796 _register_task = _unregister_task = _enter_task = _leave_task = None 2797 2798 2799class BaseCurrentLoopTests: 2800 2801 def setUp(self): 2802 super().setUp() 2803 self.loop = asyncio.new_event_loop() 2804 self.set_event_loop(self.loop) 2805 2806 def new_task(self, coro): 2807 raise NotImplementedError 2808 2809 def test_current_task_no_running_loop(self): 2810 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2811 2812 def test_current_task_no_running_loop_implicit(self): 2813 with self.assertRaisesRegex(RuntimeError, 'no running event loop'): 2814 asyncio.current_task() 2815 2816 def test_current_task_with_implicit_loop(self): 2817 async def coro(): 2818 self.assertIs(asyncio.current_task(loop=self.loop), task) 2819 2820 self.assertIs(asyncio.current_task(None), task) 2821 self.assertIs(asyncio.current_task(), task) 2822 2823 task = self.new_task(coro()) 2824 self.loop.run_until_complete(task) 2825 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2826 2827 2828class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 2829 2830 def new_task(self, coro): 2831 return tasks._PyTask(coro, loop=self.loop) 2832 2833 2834@unittest.skipUnless(hasattr(tasks, '_CTask'), 2835 'requires the C _asyncio module') 2836class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 2837 2838 def new_task(self, coro): 2839 return getattr(tasks, '_CTask')(coro, loop=self.loop) 2840 2841 2842class GenericTaskTests(test_utils.TestCase): 2843 2844 def test_future_subclass(self): 2845 self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) 2846 2847 @support.cpython_only 2848 def test_asyncio_module_compiled(self): 2849 # Because of circular imports it's easy to make _asyncio 2850 # module non-importable. This is a simple test that will 2851 # fail on systems where C modules were successfully compiled 2852 # (hence the test for _functools etc), but _asyncio somehow didn't. 2853 try: 2854 import _functools 2855 import _json 2856 import _pickle 2857 except ImportError: 2858 self.skipTest('C modules are not available') 2859 else: 2860 try: 2861 import _asyncio 2862 except ImportError: 2863 self.fail('_asyncio module is missing') 2864 2865 2866class GatherTestsBase: 2867 2868 def setUp(self): 2869 super().setUp() 2870 self.one_loop = self.new_test_loop() 2871 self.other_loop = self.new_test_loop() 2872 self.set_event_loop(self.one_loop, cleanup=False) 2873 2874 def _run_loop(self, loop): 2875 while loop._ready: 2876 test_utils.run_briefly(loop) 2877 2878 def _check_success(self, **kwargs): 2879 a, b, c = [self.one_loop.create_future() for i in range(3)] 2880 fut = self._gather(*self.wrap_futures(a, b, c), **kwargs) 2881 cb = test_utils.MockCallback() 2882 fut.add_done_callback(cb) 2883 b.set_result(1) 2884 a.set_result(2) 2885 self._run_loop(self.one_loop) 2886 self.assertEqual(cb.called, False) 2887 self.assertFalse(fut.done()) 2888 c.set_result(3) 2889 self._run_loop(self.one_loop) 2890 cb.assert_called_once_with(fut) 2891 self.assertEqual(fut.result(), [2, 1, 3]) 2892 2893 def test_success(self): 2894 self._check_success() 2895 self._check_success(return_exceptions=False) 2896 2897 def test_result_exception_success(self): 2898 self._check_success(return_exceptions=True) 2899 2900 def test_one_exception(self): 2901 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 2902 fut = self._gather(*self.wrap_futures(a, b, c, d, e)) 2903 cb = test_utils.MockCallback() 2904 fut.add_done_callback(cb) 2905 exc = ZeroDivisionError() 2906 a.set_result(1) 2907 b.set_exception(exc) 2908 self._run_loop(self.one_loop) 2909 self.assertTrue(fut.done()) 2910 cb.assert_called_once_with(fut) 2911 self.assertIs(fut.exception(), exc) 2912 # Does nothing 2913 c.set_result(3) 2914 d.cancel() 2915 e.set_exception(RuntimeError()) 2916 e.exception() 2917 2918 def test_return_exceptions(self): 2919 a, b, c, d = [self.one_loop.create_future() for i in range(4)] 2920 fut = self._gather(*self.wrap_futures(a, b, c, d), 2921 return_exceptions=True) 2922 cb = test_utils.MockCallback() 2923 fut.add_done_callback(cb) 2924 exc = ZeroDivisionError() 2925 exc2 = RuntimeError() 2926 b.set_result(1) 2927 c.set_exception(exc) 2928 a.set_result(3) 2929 self._run_loop(self.one_loop) 2930 self.assertFalse(fut.done()) 2931 d.set_exception(exc2) 2932 self._run_loop(self.one_loop) 2933 self.assertTrue(fut.done()) 2934 cb.assert_called_once_with(fut) 2935 self.assertEqual(fut.result(), [3, 1, exc, exc2]) 2936 2937 def test_env_var_debug(self): 2938 code = '\n'.join(( 2939 'import asyncio.coroutines', 2940 'print(asyncio.coroutines._is_debug_mode())')) 2941 2942 # Test with -E to not fail if the unit test was run with 2943 # PYTHONASYNCIODEBUG set to a non-empty string 2944 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 2945 self.assertEqual(stdout.rstrip(), b'False') 2946 2947 sts, stdout, stderr = assert_python_ok('-c', code, 2948 PYTHONASYNCIODEBUG='', 2949 PYTHONDEVMODE='') 2950 self.assertEqual(stdout.rstrip(), b'False') 2951 2952 sts, stdout, stderr = assert_python_ok('-c', code, 2953 PYTHONASYNCIODEBUG='1', 2954 PYTHONDEVMODE='') 2955 self.assertEqual(stdout.rstrip(), b'True') 2956 2957 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 2958 PYTHONASYNCIODEBUG='1', 2959 PYTHONDEVMODE='') 2960 self.assertEqual(stdout.rstrip(), b'False') 2961 2962 # -X dev 2963 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 2964 '-c', code) 2965 self.assertEqual(stdout.rstrip(), b'True') 2966 2967 2968class FutureGatherTests(GatherTestsBase, test_utils.TestCase): 2969 2970 def wrap_futures(self, *futures): 2971 return futures 2972 2973 def _gather(self, *args, **kwargs): 2974 return asyncio.gather(*args, **kwargs) 2975 2976 def test_constructor_empty_sequence_without_loop(self): 2977 with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 2978 asyncio.gather() 2979 2980 def test_constructor_empty_sequence_use_running_loop(self): 2981 async def gather(): 2982 return asyncio.gather() 2983 fut = self.one_loop.run_until_complete(gather()) 2984 self.assertIsInstance(fut, asyncio.Future) 2985 self.assertIs(fut._loop, self.one_loop) 2986 self._run_loop(self.one_loop) 2987 self.assertTrue(fut.done()) 2988 self.assertEqual(fut.result(), []) 2989 2990 def test_constructor_empty_sequence_use_global_loop(self): 2991 # Deprecated in 3.10, undeprecated in 3.11.1 2992 asyncio.set_event_loop(self.one_loop) 2993 self.addCleanup(asyncio.set_event_loop, None) 2994 fut = asyncio.gather() 2995 self.assertIsInstance(fut, asyncio.Future) 2996 self.assertIs(fut._loop, self.one_loop) 2997 self._run_loop(self.one_loop) 2998 self.assertTrue(fut.done()) 2999 self.assertEqual(fut.result(), []) 3000 3001 def test_constructor_heterogenous_futures(self): 3002 fut1 = self.one_loop.create_future() 3003 fut2 = self.other_loop.create_future() 3004 with self.assertRaises(ValueError): 3005 asyncio.gather(fut1, fut2) 3006 3007 def test_constructor_homogenous_futures(self): 3008 children = [self.other_loop.create_future() for i in range(3)] 3009 fut = asyncio.gather(*children) 3010 self.assertIs(fut._loop, self.other_loop) 3011 self._run_loop(self.other_loop) 3012 self.assertFalse(fut.done()) 3013 fut = asyncio.gather(*children) 3014 self.assertIs(fut._loop, self.other_loop) 3015 self._run_loop(self.other_loop) 3016 self.assertFalse(fut.done()) 3017 3018 def test_one_cancellation(self): 3019 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3020 fut = asyncio.gather(a, b, c, d, e) 3021 cb = test_utils.MockCallback() 3022 fut.add_done_callback(cb) 3023 a.set_result(1) 3024 b.cancel() 3025 self._run_loop(self.one_loop) 3026 self.assertTrue(fut.done()) 3027 cb.assert_called_once_with(fut) 3028 self.assertFalse(fut.cancelled()) 3029 self.assertIsInstance(fut.exception(), asyncio.CancelledError) 3030 # Does nothing 3031 c.set_result(3) 3032 d.cancel() 3033 e.set_exception(RuntimeError()) 3034 e.exception() 3035 3036 def test_result_exception_one_cancellation(self): 3037 a, b, c, d, e, f = [self.one_loop.create_future() 3038 for i in range(6)] 3039 fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) 3040 cb = test_utils.MockCallback() 3041 fut.add_done_callback(cb) 3042 a.set_result(1) 3043 zde = ZeroDivisionError() 3044 b.set_exception(zde) 3045 c.cancel() 3046 self._run_loop(self.one_loop) 3047 self.assertFalse(fut.done()) 3048 d.set_result(3) 3049 e.cancel() 3050 rte = RuntimeError() 3051 f.set_exception(rte) 3052 res = self.one_loop.run_until_complete(fut) 3053 self.assertIsInstance(res[2], asyncio.CancelledError) 3054 self.assertIsInstance(res[4], asyncio.CancelledError) 3055 res[2] = res[4] = None 3056 self.assertEqual(res, [1, zde, None, 3, None, rte]) 3057 cb.assert_called_once_with(fut) 3058 3059 3060class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): 3061 3062 def wrap_futures(self, *futures): 3063 coros = [] 3064 for fut in futures: 3065 async def coro(fut=fut): 3066 return await fut 3067 coros.append(coro()) 3068 return coros 3069 3070 def _gather(self, *args, **kwargs): 3071 async def coro(): 3072 return asyncio.gather(*args, **kwargs) 3073 return self.one_loop.run_until_complete(coro()) 3074 3075 def test_constructor_without_loop(self): 3076 async def coro(): 3077 return 'abc' 3078 gen1 = coro() 3079 self.addCleanup(gen1.close) 3080 gen2 = coro() 3081 self.addCleanup(gen2.close) 3082 with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 3083 asyncio.gather(gen1, gen2) 3084 3085 def test_constructor_use_running_loop(self): 3086 async def coro(): 3087 return 'abc' 3088 gen1 = coro() 3089 gen2 = coro() 3090 async def gather(): 3091 return asyncio.gather(gen1, gen2) 3092 fut = self.one_loop.run_until_complete(gather()) 3093 self.assertIs(fut._loop, self.one_loop) 3094 self.one_loop.run_until_complete(fut) 3095 3096 def test_constructor_use_global_loop(self): 3097 # Deprecated in 3.10, undeprecated in 3.11.1 3098 async def coro(): 3099 return 'abc' 3100 asyncio.set_event_loop(self.other_loop) 3101 self.addCleanup(asyncio.set_event_loop, None) 3102 gen1 = coro() 3103 gen2 = coro() 3104 fut = asyncio.gather(gen1, gen2) 3105 self.assertIs(fut._loop, self.other_loop) 3106 self.other_loop.run_until_complete(fut) 3107 3108 def test_duplicate_coroutines(self): 3109 async def coro(s): 3110 return s 3111 c = coro('abc') 3112 fut = self._gather(c, c, coro('def'), c) 3113 self._run_loop(self.one_loop) 3114 self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) 3115 3116 def test_cancellation_broadcast(self): 3117 # Cancelling outer() cancels all children. 3118 proof = 0 3119 waiter = self.one_loop.create_future() 3120 3121 async def inner(): 3122 nonlocal proof 3123 await waiter 3124 proof += 1 3125 3126 child1 = asyncio.ensure_future(inner(), loop=self.one_loop) 3127 child2 = asyncio.ensure_future(inner(), loop=self.one_loop) 3128 gatherer = None 3129 3130 async def outer(): 3131 nonlocal proof, gatherer 3132 gatherer = asyncio.gather(child1, child2) 3133 await gatherer 3134 proof += 100 3135 3136 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3137 test_utils.run_briefly(self.one_loop) 3138 self.assertTrue(f.cancel()) 3139 with self.assertRaises(asyncio.CancelledError): 3140 self.one_loop.run_until_complete(f) 3141 self.assertFalse(gatherer.cancel()) 3142 self.assertTrue(waiter.cancelled()) 3143 self.assertTrue(child1.cancelled()) 3144 self.assertTrue(child2.cancelled()) 3145 test_utils.run_briefly(self.one_loop) 3146 self.assertEqual(proof, 0) 3147 3148 def test_exception_marking(self): 3149 # Test for the first line marked "Mark exception retrieved." 3150 3151 async def inner(f): 3152 await f 3153 raise RuntimeError('should not be ignored') 3154 3155 a = self.one_loop.create_future() 3156 b = self.one_loop.create_future() 3157 3158 async def outer(): 3159 await asyncio.gather(inner(a), inner(b)) 3160 3161 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3162 test_utils.run_briefly(self.one_loop) 3163 a.set_result(None) 3164 test_utils.run_briefly(self.one_loop) 3165 b.set_result(None) 3166 test_utils.run_briefly(self.one_loop) 3167 self.assertIsInstance(f.exception(), RuntimeError) 3168 3169 def test_issue46672(self): 3170 with mock.patch( 3171 'asyncio.base_events.BaseEventLoop.call_exception_handler', 3172 ): 3173 async def coro(s): 3174 return s 3175 c = coro('abc') 3176 3177 with self.assertRaises(TypeError): 3178 self._gather(c, {}) 3179 self._run_loop(self.one_loop) 3180 # NameError should not happen: 3181 self.one_loop.call_exception_handler.assert_not_called() 3182 3183 3184class RunCoroutineThreadsafeTests(test_utils.TestCase): 3185 """Test case for asyncio.run_coroutine_threadsafe.""" 3186 3187 def setUp(self): 3188 super().setUp() 3189 self.loop = asyncio.new_event_loop() 3190 self.set_event_loop(self.loop) # Will cleanup properly 3191 3192 async def add(self, a, b, fail=False, cancel=False): 3193 """Wait 0.05 second and return a + b.""" 3194 await asyncio.sleep(0.05) 3195 if fail: 3196 raise RuntimeError("Fail!") 3197 if cancel: 3198 asyncio.current_task(self.loop).cancel() 3199 await asyncio.sleep(0) 3200 return a + b 3201 3202 def target(self, fail=False, cancel=False, timeout=None, 3203 advance_coro=False): 3204 """Run add coroutine in the event loop.""" 3205 coro = self.add(1, 2, fail=fail, cancel=cancel) 3206 future = asyncio.run_coroutine_threadsafe(coro, self.loop) 3207 if advance_coro: 3208 # this is for test_run_coroutine_threadsafe_task_factory_exception; 3209 # otherwise it spills errors and breaks **other** unittests, since 3210 # 'target' is interacting with threads. 3211 3212 # With this call, `coro` will be advanced. 3213 self.loop.call_soon_threadsafe(coro.send, None) 3214 try: 3215 return future.result(timeout) 3216 finally: 3217 future.done() or future.cancel() 3218 3219 def test_run_coroutine_threadsafe(self): 3220 """Test coroutine submission from a thread to an event loop.""" 3221 future = self.loop.run_in_executor(None, self.target) 3222 result = self.loop.run_until_complete(future) 3223 self.assertEqual(result, 3) 3224 3225 def test_run_coroutine_threadsafe_with_exception(self): 3226 """Test coroutine submission from a thread to an event loop 3227 when an exception is raised.""" 3228 future = self.loop.run_in_executor(None, self.target, True) 3229 with self.assertRaises(RuntimeError) as exc_context: 3230 self.loop.run_until_complete(future) 3231 self.assertIn("Fail!", exc_context.exception.args) 3232 3233 def test_run_coroutine_threadsafe_with_timeout(self): 3234 """Test coroutine submission from a thread to an event loop 3235 when a timeout is raised.""" 3236 callback = lambda: self.target(timeout=0) 3237 future = self.loop.run_in_executor(None, callback) 3238 with self.assertRaises(asyncio.TimeoutError): 3239 self.loop.run_until_complete(future) 3240 test_utils.run_briefly(self.loop) 3241 # Check that there's no pending task (add has been cancelled) 3242 for task in asyncio.all_tasks(self.loop): 3243 self.assertTrue(task.done()) 3244 3245 def test_run_coroutine_threadsafe_task_cancelled(self): 3246 """Test coroutine submission from a thread to an event loop 3247 when the task is cancelled.""" 3248 callback = lambda: self.target(cancel=True) 3249 future = self.loop.run_in_executor(None, callback) 3250 with self.assertRaises(asyncio.CancelledError): 3251 self.loop.run_until_complete(future) 3252 3253 def test_run_coroutine_threadsafe_task_factory_exception(self): 3254 """Test coroutine submission from a thread to an event loop 3255 when the task factory raise an exception.""" 3256 3257 def task_factory(loop, coro): 3258 raise NameError 3259 3260 run = self.loop.run_in_executor( 3261 None, lambda: self.target(advance_coro=True)) 3262 3263 # Set exception handler 3264 callback = test_utils.MockCallback() 3265 self.loop.set_exception_handler(callback) 3266 3267 # Set corrupted task factory 3268 self.addCleanup(self.loop.set_task_factory, 3269 self.loop.get_task_factory()) 3270 self.loop.set_task_factory(task_factory) 3271 3272 # Run event loop 3273 with self.assertRaises(NameError) as exc_context: 3274 self.loop.run_until_complete(run) 3275 3276 # Check exceptions 3277 self.assertEqual(len(callback.call_args_list), 1) 3278 (loop, context), kwargs = callback.call_args 3279 self.assertEqual(context['exception'], exc_context.exception) 3280 3281 3282class SleepTests(test_utils.TestCase): 3283 def setUp(self): 3284 super().setUp() 3285 self.loop = asyncio.new_event_loop() 3286 self.set_event_loop(self.loop) 3287 3288 def tearDown(self): 3289 self.loop.close() 3290 self.loop = None 3291 super().tearDown() 3292 3293 def test_sleep_zero(self): 3294 result = 0 3295 3296 def inc_result(num): 3297 nonlocal result 3298 result += num 3299 3300 async def coro(): 3301 self.loop.call_soon(inc_result, 1) 3302 self.assertEqual(result, 0) 3303 num = await asyncio.sleep(0, result=10) 3304 self.assertEqual(result, 1) # inc'ed by call_soon 3305 inc_result(num) # num should be 11 3306 3307 self.loop.run_until_complete(coro()) 3308 self.assertEqual(result, 11) 3309 3310 3311class CompatibilityTests(test_utils.TestCase): 3312 # Tests for checking a bridge between old-styled coroutines 3313 # and async/await syntax 3314 3315 def setUp(self): 3316 super().setUp() 3317 self.loop = asyncio.new_event_loop() 3318 self.set_event_loop(self.loop) 3319 3320 def tearDown(self): 3321 self.loop.close() 3322 self.loop = None 3323 super().tearDown() 3324 3325 3326if __name__ == '__main__': 3327 unittest.main() 3328