1"""Tests for locks.py""" 2 3import unittest 4from unittest import mock 5import re 6 7import asyncio 8import collections 9 10STR_RGX_REPR = ( 11 r'^<(?P<class>.*?) object at (?P<address>.*?)' 12 r'\[(?P<extras>' 13 r'(set|unset|locked|unlocked|filling|draining|resetting|broken)' 14 r'(, value:\d)?' 15 r'(, waiters:\d+)?' 16 r'(, waiters:\d+\/\d+)?' # barrier 17 r')\]>\Z' 18) 19RGX_REPR = re.compile(STR_RGX_REPR) 20 21 22def tearDownModule(): 23 asyncio.set_event_loop_policy(None) 24 25 26class LockTests(unittest.IsolatedAsyncioTestCase): 27 28 async def test_repr(self): 29 lock = asyncio.Lock() 30 self.assertTrue(repr(lock).endswith('[unlocked]>')) 31 self.assertTrue(RGX_REPR.match(repr(lock))) 32 33 await lock.acquire() 34 self.assertTrue(repr(lock).endswith('[locked]>')) 35 self.assertTrue(RGX_REPR.match(repr(lock))) 36 37 async def test_lock(self): 38 lock = asyncio.Lock() 39 40 with self.assertRaisesRegex( 41 TypeError, 42 "object Lock can't be used in 'await' expression" 43 ): 44 await lock 45 46 self.assertFalse(lock.locked()) 47 48 async def test_lock_doesnt_accept_loop_parameter(self): 49 primitives_cls = [ 50 asyncio.Lock, 51 asyncio.Condition, 52 asyncio.Event, 53 asyncio.Semaphore, 54 asyncio.BoundedSemaphore, 55 ] 56 57 loop = asyncio.get_running_loop() 58 59 for cls in primitives_cls: 60 with self.assertRaisesRegex( 61 TypeError, 62 rf"{cls.__name__}\.__init__\(\) got an unexpected " 63 rf"keyword argument 'loop'" 64 ): 65 cls(loop=loop) 66 67 async def test_lock_by_with_statement(self): 68 primitives = [ 69 asyncio.Lock(), 70 asyncio.Condition(), 71 asyncio.Semaphore(), 72 asyncio.BoundedSemaphore(), 73 ] 74 75 for lock in primitives: 76 await asyncio.sleep(0.01) 77 self.assertFalse(lock.locked()) 78 with self.assertRaisesRegex( 79 TypeError, 80 r"object \w+ can't be used in 'await' expression" 81 ): 82 with await lock: 83 pass 84 self.assertFalse(lock.locked()) 85 86 async def test_acquire(self): 87 lock = asyncio.Lock() 88 result = [] 89 90 self.assertTrue(await lock.acquire()) 91 92 async def c1(result): 93 if await lock.acquire(): 94 result.append(1) 95 return True 96 97 async def c2(result): 98 if await lock.acquire(): 99 result.append(2) 100 return True 101 102 async def c3(result): 103 if await lock.acquire(): 104 result.append(3) 105 return True 106 107 t1 = asyncio.create_task(c1(result)) 108 t2 = asyncio.create_task(c2(result)) 109 110 await asyncio.sleep(0) 111 self.assertEqual([], result) 112 113 lock.release() 114 await asyncio.sleep(0) 115 self.assertEqual([1], result) 116 117 await asyncio.sleep(0) 118 self.assertEqual([1], result) 119 120 t3 = asyncio.create_task(c3(result)) 121 122 lock.release() 123 await asyncio.sleep(0) 124 self.assertEqual([1, 2], result) 125 126 lock.release() 127 await asyncio.sleep(0) 128 self.assertEqual([1, 2, 3], result) 129 130 self.assertTrue(t1.done()) 131 self.assertTrue(t1.result()) 132 self.assertTrue(t2.done()) 133 self.assertTrue(t2.result()) 134 self.assertTrue(t3.done()) 135 self.assertTrue(t3.result()) 136 137 async def test_acquire_cancel(self): 138 lock = asyncio.Lock() 139 self.assertTrue(await lock.acquire()) 140 141 task = asyncio.create_task(lock.acquire()) 142 asyncio.get_running_loop().call_soon(task.cancel) 143 with self.assertRaises(asyncio.CancelledError): 144 await task 145 self.assertFalse(lock._waiters) 146 147 async def test_cancel_race(self): 148 # Several tasks: 149 # - A acquires the lock 150 # - B is blocked in acquire() 151 # - C is blocked in acquire() 152 # 153 # Now, concurrently: 154 # - B is cancelled 155 # - A releases the lock 156 # 157 # If B's waiter is marked cancelled but not yet removed from 158 # _waiters, A's release() call will crash when trying to set 159 # B's waiter; instead, it should move on to C's waiter. 160 161 # Setup: A has the lock, b and c are waiting. 162 lock = asyncio.Lock() 163 164 async def lockit(name, blocker): 165 await lock.acquire() 166 try: 167 if blocker is not None: 168 await blocker 169 finally: 170 lock.release() 171 172 fa = asyncio.get_running_loop().create_future() 173 ta = asyncio.create_task(lockit('A', fa)) 174 await asyncio.sleep(0) 175 self.assertTrue(lock.locked()) 176 tb = asyncio.create_task(lockit('B', None)) 177 await asyncio.sleep(0) 178 self.assertEqual(len(lock._waiters), 1) 179 tc = asyncio.create_task(lockit('C', None)) 180 await asyncio.sleep(0) 181 self.assertEqual(len(lock._waiters), 2) 182 183 # Create the race and check. 184 # Without the fix this failed at the last assert. 185 fa.set_result(None) 186 tb.cancel() 187 self.assertTrue(lock._waiters[0].cancelled()) 188 await asyncio.sleep(0) 189 self.assertFalse(lock.locked()) 190 self.assertTrue(ta.done()) 191 self.assertTrue(tb.cancelled()) 192 await tc 193 194 async def test_cancel_release_race(self): 195 # Issue 32734 196 # Acquire 4 locks, cancel second, release first 197 # and 2 locks are taken at once. 198 loop = asyncio.get_running_loop() 199 lock = asyncio.Lock() 200 lock_count = 0 201 call_count = 0 202 203 async def lockit(): 204 nonlocal lock_count 205 nonlocal call_count 206 call_count += 1 207 await lock.acquire() 208 lock_count += 1 209 210 def trigger(): 211 t1.cancel() 212 lock.release() 213 214 await lock.acquire() 215 216 t1 = asyncio.create_task(lockit()) 217 t2 = asyncio.create_task(lockit()) 218 t3 = asyncio.create_task(lockit()) 219 220 # Start scheduled tasks 221 await asyncio.sleep(0) 222 223 loop.call_soon(trigger) 224 with self.assertRaises(asyncio.CancelledError): 225 # Wait for cancellation 226 await t1 227 228 # Make sure only one lock was taken 229 self.assertEqual(lock_count, 1) 230 # While 3 calls were made to lockit() 231 self.assertEqual(call_count, 3) 232 self.assertTrue(t1.cancelled() and t2.done()) 233 234 # Cleanup the task that is stuck on acquire. 235 t3.cancel() 236 await asyncio.sleep(0) 237 self.assertTrue(t3.cancelled()) 238 239 async def test_finished_waiter_cancelled(self): 240 lock = asyncio.Lock() 241 242 await lock.acquire() 243 self.assertTrue(lock.locked()) 244 245 tb = asyncio.create_task(lock.acquire()) 246 await asyncio.sleep(0) 247 self.assertEqual(len(lock._waiters), 1) 248 249 # Create a second waiter, wake up the first, and cancel it. 250 # Without the fix, the second was not woken up. 251 tc = asyncio.create_task(lock.acquire()) 252 tb.cancel() 253 lock.release() 254 await asyncio.sleep(0) 255 256 self.assertTrue(lock.locked()) 257 self.assertTrue(tb.cancelled()) 258 259 # Cleanup 260 await tc 261 262 async def test_release_not_acquired(self): 263 lock = asyncio.Lock() 264 265 self.assertRaises(RuntimeError, lock.release) 266 267 async def test_release_no_waiters(self): 268 lock = asyncio.Lock() 269 await lock.acquire() 270 self.assertTrue(lock.locked()) 271 272 lock.release() 273 self.assertFalse(lock.locked()) 274 275 async def test_context_manager(self): 276 lock = asyncio.Lock() 277 self.assertFalse(lock.locked()) 278 279 async with lock: 280 self.assertTrue(lock.locked()) 281 282 self.assertFalse(lock.locked()) 283 284 285class EventTests(unittest.IsolatedAsyncioTestCase): 286 287 def test_repr(self): 288 ev = asyncio.Event() 289 self.assertTrue(repr(ev).endswith('[unset]>')) 290 match = RGX_REPR.match(repr(ev)) 291 self.assertEqual(match.group('extras'), 'unset') 292 293 ev.set() 294 self.assertTrue(repr(ev).endswith('[set]>')) 295 self.assertTrue(RGX_REPR.match(repr(ev))) 296 297 ev._waiters.append(mock.Mock()) 298 self.assertTrue('waiters:1' in repr(ev)) 299 self.assertTrue(RGX_REPR.match(repr(ev))) 300 301 async def test_wait(self): 302 ev = asyncio.Event() 303 self.assertFalse(ev.is_set()) 304 305 result = [] 306 307 async def c1(result): 308 if await ev.wait(): 309 result.append(1) 310 311 async def c2(result): 312 if await ev.wait(): 313 result.append(2) 314 315 async def c3(result): 316 if await ev.wait(): 317 result.append(3) 318 319 t1 = asyncio.create_task(c1(result)) 320 t2 = asyncio.create_task(c2(result)) 321 322 await asyncio.sleep(0) 323 self.assertEqual([], result) 324 325 t3 = asyncio.create_task(c3(result)) 326 327 ev.set() 328 await asyncio.sleep(0) 329 self.assertEqual([3, 1, 2], result) 330 331 self.assertTrue(t1.done()) 332 self.assertIsNone(t1.result()) 333 self.assertTrue(t2.done()) 334 self.assertIsNone(t2.result()) 335 self.assertTrue(t3.done()) 336 self.assertIsNone(t3.result()) 337 338 async def test_wait_on_set(self): 339 ev = asyncio.Event() 340 ev.set() 341 342 res = await ev.wait() 343 self.assertTrue(res) 344 345 async def test_wait_cancel(self): 346 ev = asyncio.Event() 347 348 wait = asyncio.create_task(ev.wait()) 349 asyncio.get_running_loop().call_soon(wait.cancel) 350 with self.assertRaises(asyncio.CancelledError): 351 await wait 352 self.assertFalse(ev._waiters) 353 354 async def test_clear(self): 355 ev = asyncio.Event() 356 self.assertFalse(ev.is_set()) 357 358 ev.set() 359 self.assertTrue(ev.is_set()) 360 361 ev.clear() 362 self.assertFalse(ev.is_set()) 363 364 async def test_clear_with_waiters(self): 365 ev = asyncio.Event() 366 result = [] 367 368 async def c1(result): 369 if await ev.wait(): 370 result.append(1) 371 return True 372 373 t = asyncio.create_task(c1(result)) 374 await asyncio.sleep(0) 375 self.assertEqual([], result) 376 377 ev.set() 378 ev.clear() 379 self.assertFalse(ev.is_set()) 380 381 ev.set() 382 ev.set() 383 self.assertEqual(1, len(ev._waiters)) 384 385 await asyncio.sleep(0) 386 self.assertEqual([1], result) 387 self.assertEqual(0, len(ev._waiters)) 388 389 self.assertTrue(t.done()) 390 self.assertTrue(t.result()) 391 392 393class ConditionTests(unittest.IsolatedAsyncioTestCase): 394 395 async def test_wait(self): 396 cond = asyncio.Condition() 397 result = [] 398 399 async def c1(result): 400 await cond.acquire() 401 if await cond.wait(): 402 result.append(1) 403 return True 404 405 async def c2(result): 406 await cond.acquire() 407 if await cond.wait(): 408 result.append(2) 409 return True 410 411 async def c3(result): 412 await cond.acquire() 413 if await cond.wait(): 414 result.append(3) 415 return True 416 417 t1 = asyncio.create_task(c1(result)) 418 t2 = asyncio.create_task(c2(result)) 419 t3 = asyncio.create_task(c3(result)) 420 421 await asyncio.sleep(0) 422 self.assertEqual([], result) 423 self.assertFalse(cond.locked()) 424 425 self.assertTrue(await cond.acquire()) 426 cond.notify() 427 await asyncio.sleep(0) 428 self.assertEqual([], result) 429 self.assertTrue(cond.locked()) 430 431 cond.release() 432 await asyncio.sleep(0) 433 self.assertEqual([1], result) 434 self.assertTrue(cond.locked()) 435 436 cond.notify(2) 437 await asyncio.sleep(0) 438 self.assertEqual([1], result) 439 self.assertTrue(cond.locked()) 440 441 cond.release() 442 await asyncio.sleep(0) 443 self.assertEqual([1, 2], result) 444 self.assertTrue(cond.locked()) 445 446 cond.release() 447 await asyncio.sleep(0) 448 self.assertEqual([1, 2, 3], result) 449 self.assertTrue(cond.locked()) 450 451 self.assertTrue(t1.done()) 452 self.assertTrue(t1.result()) 453 self.assertTrue(t2.done()) 454 self.assertTrue(t2.result()) 455 self.assertTrue(t3.done()) 456 self.assertTrue(t3.result()) 457 458 async def test_wait_cancel(self): 459 cond = asyncio.Condition() 460 await cond.acquire() 461 462 wait = asyncio.create_task(cond.wait()) 463 asyncio.get_running_loop().call_soon(wait.cancel) 464 with self.assertRaises(asyncio.CancelledError): 465 await wait 466 self.assertFalse(cond._waiters) 467 self.assertTrue(cond.locked()) 468 469 async def test_wait_cancel_contested(self): 470 cond = asyncio.Condition() 471 472 await cond.acquire() 473 self.assertTrue(cond.locked()) 474 475 wait_task = asyncio.create_task(cond.wait()) 476 await asyncio.sleep(0) 477 self.assertFalse(cond.locked()) 478 479 # Notify, but contest the lock before cancelling 480 await cond.acquire() 481 self.assertTrue(cond.locked()) 482 cond.notify() 483 asyncio.get_running_loop().call_soon(wait_task.cancel) 484 asyncio.get_running_loop().call_soon(cond.release) 485 486 try: 487 await wait_task 488 except asyncio.CancelledError: 489 # Should not happen, since no cancellation points 490 pass 491 492 self.assertTrue(cond.locked()) 493 494 async def test_wait_cancel_after_notify(self): 495 # See bpo-32841 496 waited = False 497 498 cond = asyncio.Condition() 499 500 async def wait_on_cond(): 501 nonlocal waited 502 async with cond: 503 waited = True # Make sure this area was reached 504 await cond.wait() 505 506 waiter = asyncio.create_task(wait_on_cond()) 507 await asyncio.sleep(0) # Start waiting 508 509 await cond.acquire() 510 cond.notify() 511 await asyncio.sleep(0) # Get to acquire() 512 waiter.cancel() 513 await asyncio.sleep(0) # Activate cancellation 514 cond.release() 515 await asyncio.sleep(0) # Cancellation should occur 516 517 self.assertTrue(waiter.cancelled()) 518 self.assertTrue(waited) 519 520 async def test_wait_unacquired(self): 521 cond = asyncio.Condition() 522 with self.assertRaises(RuntimeError): 523 await cond.wait() 524 525 async def test_wait_for(self): 526 cond = asyncio.Condition() 527 presult = False 528 529 def predicate(): 530 return presult 531 532 result = [] 533 534 async def c1(result): 535 await cond.acquire() 536 if await cond.wait_for(predicate): 537 result.append(1) 538 cond.release() 539 return True 540 541 t = asyncio.create_task(c1(result)) 542 543 await asyncio.sleep(0) 544 self.assertEqual([], result) 545 546 await cond.acquire() 547 cond.notify() 548 cond.release() 549 await asyncio.sleep(0) 550 self.assertEqual([], result) 551 552 presult = True 553 await cond.acquire() 554 cond.notify() 555 cond.release() 556 await asyncio.sleep(0) 557 self.assertEqual([1], result) 558 559 self.assertTrue(t.done()) 560 self.assertTrue(t.result()) 561 562 async def test_wait_for_unacquired(self): 563 cond = asyncio.Condition() 564 565 # predicate can return true immediately 566 res = await cond.wait_for(lambda: [1, 2, 3]) 567 self.assertEqual([1, 2, 3], res) 568 569 with self.assertRaises(RuntimeError): 570 await cond.wait_for(lambda: False) 571 572 async def test_notify(self): 573 cond = asyncio.Condition() 574 result = [] 575 576 async def c1(result): 577 await cond.acquire() 578 if await cond.wait(): 579 result.append(1) 580 cond.release() 581 return True 582 583 async def c2(result): 584 await cond.acquire() 585 if await cond.wait(): 586 result.append(2) 587 cond.release() 588 return True 589 590 async def c3(result): 591 await cond.acquire() 592 if await cond.wait(): 593 result.append(3) 594 cond.release() 595 return True 596 597 t1 = asyncio.create_task(c1(result)) 598 t2 = asyncio.create_task(c2(result)) 599 t3 = asyncio.create_task(c3(result)) 600 601 await asyncio.sleep(0) 602 self.assertEqual([], result) 603 604 await cond.acquire() 605 cond.notify(1) 606 cond.release() 607 await asyncio.sleep(0) 608 self.assertEqual([1], result) 609 610 await cond.acquire() 611 cond.notify(1) 612 cond.notify(2048) 613 cond.release() 614 await asyncio.sleep(0) 615 self.assertEqual([1, 2, 3], result) 616 617 self.assertTrue(t1.done()) 618 self.assertTrue(t1.result()) 619 self.assertTrue(t2.done()) 620 self.assertTrue(t2.result()) 621 self.assertTrue(t3.done()) 622 self.assertTrue(t3.result()) 623 624 async def test_notify_all(self): 625 cond = asyncio.Condition() 626 627 result = [] 628 629 async def c1(result): 630 await cond.acquire() 631 if await cond.wait(): 632 result.append(1) 633 cond.release() 634 return True 635 636 async def c2(result): 637 await cond.acquire() 638 if await cond.wait(): 639 result.append(2) 640 cond.release() 641 return True 642 643 t1 = asyncio.create_task(c1(result)) 644 t2 = asyncio.create_task(c2(result)) 645 646 await asyncio.sleep(0) 647 self.assertEqual([], result) 648 649 await cond.acquire() 650 cond.notify_all() 651 cond.release() 652 await asyncio.sleep(0) 653 self.assertEqual([1, 2], result) 654 655 self.assertTrue(t1.done()) 656 self.assertTrue(t1.result()) 657 self.assertTrue(t2.done()) 658 self.assertTrue(t2.result()) 659 660 def test_notify_unacquired(self): 661 cond = asyncio.Condition() 662 self.assertRaises(RuntimeError, cond.notify) 663 664 def test_notify_all_unacquired(self): 665 cond = asyncio.Condition() 666 self.assertRaises(RuntimeError, cond.notify_all) 667 668 async def test_repr(self): 669 cond = asyncio.Condition() 670 self.assertTrue('unlocked' in repr(cond)) 671 self.assertTrue(RGX_REPR.match(repr(cond))) 672 673 await cond.acquire() 674 self.assertTrue('locked' in repr(cond)) 675 676 cond._waiters.append(mock.Mock()) 677 self.assertTrue('waiters:1' in repr(cond)) 678 self.assertTrue(RGX_REPR.match(repr(cond))) 679 680 cond._waiters.append(mock.Mock()) 681 self.assertTrue('waiters:2' in repr(cond)) 682 self.assertTrue(RGX_REPR.match(repr(cond))) 683 684 async def test_context_manager(self): 685 cond = asyncio.Condition() 686 self.assertFalse(cond.locked()) 687 async with cond: 688 self.assertTrue(cond.locked()) 689 self.assertFalse(cond.locked()) 690 691 async def test_explicit_lock(self): 692 async def f(lock=None, cond=None): 693 if lock is None: 694 lock = asyncio.Lock() 695 if cond is None: 696 cond = asyncio.Condition(lock) 697 self.assertIs(cond._lock, lock) 698 self.assertFalse(lock.locked()) 699 self.assertFalse(cond.locked()) 700 async with cond: 701 self.assertTrue(lock.locked()) 702 self.assertTrue(cond.locked()) 703 self.assertFalse(lock.locked()) 704 self.assertFalse(cond.locked()) 705 async with lock: 706 self.assertTrue(lock.locked()) 707 self.assertTrue(cond.locked()) 708 self.assertFalse(lock.locked()) 709 self.assertFalse(cond.locked()) 710 711 # All should work in the same way. 712 await f() 713 await f(asyncio.Lock()) 714 lock = asyncio.Lock() 715 await f(lock, asyncio.Condition(lock)) 716 717 async def test_ambiguous_loops(self): 718 loop = asyncio.new_event_loop() 719 self.addCleanup(loop.close) 720 721 async def wrong_loop_in_lock(): 722 with self.assertRaises(TypeError): 723 asyncio.Lock(loop=loop) # actively disallowed since 3.10 724 lock = asyncio.Lock() 725 lock._loop = loop # use private API for testing 726 async with lock: 727 # acquired immediately via the fast-path 728 # without interaction with any event loop. 729 cond = asyncio.Condition(lock) 730 # cond.acquire() will trigger waiting on the lock 731 # and it will discover the event loop mismatch. 732 with self.assertRaisesRegex( 733 RuntimeError, 734 "is bound to a different event loop", 735 ): 736 await cond.acquire() 737 738 async def wrong_loop_in_cond(): 739 # Same analogy here with the condition's loop. 740 lock = asyncio.Lock() 741 async with lock: 742 with self.assertRaises(TypeError): 743 asyncio.Condition(lock, loop=loop) 744 cond = asyncio.Condition(lock) 745 cond._loop = loop 746 with self.assertRaisesRegex( 747 RuntimeError, 748 "is bound to a different event loop", 749 ): 750 await cond.wait() 751 752 await wrong_loop_in_lock() 753 await wrong_loop_in_cond() 754 755 async def test_timeout_in_block(self): 756 condition = asyncio.Condition() 757 async with condition: 758 with self.assertRaises(asyncio.TimeoutError): 759 await asyncio.wait_for(condition.wait(), timeout=0.5) 760 761 762class SemaphoreTests(unittest.IsolatedAsyncioTestCase): 763 764 def test_initial_value_zero(self): 765 sem = asyncio.Semaphore(0) 766 self.assertTrue(sem.locked()) 767 768 async def test_repr(self): 769 sem = asyncio.Semaphore() 770 self.assertTrue(repr(sem).endswith('[unlocked, value:1]>')) 771 self.assertTrue(RGX_REPR.match(repr(sem))) 772 773 await sem.acquire() 774 self.assertTrue(repr(sem).endswith('[locked]>')) 775 self.assertTrue('waiters' not in repr(sem)) 776 self.assertTrue(RGX_REPR.match(repr(sem))) 777 778 if sem._waiters is None: 779 sem._waiters = collections.deque() 780 781 sem._waiters.append(mock.Mock()) 782 self.assertTrue('waiters:1' in repr(sem)) 783 self.assertTrue(RGX_REPR.match(repr(sem))) 784 785 sem._waiters.append(mock.Mock()) 786 self.assertTrue('waiters:2' in repr(sem)) 787 self.assertTrue(RGX_REPR.match(repr(sem))) 788 789 async def test_semaphore(self): 790 sem = asyncio.Semaphore() 791 self.assertEqual(1, sem._value) 792 793 with self.assertRaisesRegex( 794 TypeError, 795 "object Semaphore can't be used in 'await' expression", 796 ): 797 await sem 798 799 self.assertFalse(sem.locked()) 800 self.assertEqual(1, sem._value) 801 802 def test_semaphore_value(self): 803 self.assertRaises(ValueError, asyncio.Semaphore, -1) 804 805 async def test_acquire(self): 806 sem = asyncio.Semaphore(3) 807 result = [] 808 809 self.assertTrue(await sem.acquire()) 810 self.assertTrue(await sem.acquire()) 811 self.assertFalse(sem.locked()) 812 813 async def c1(result): 814 await sem.acquire() 815 result.append(1) 816 return True 817 818 async def c2(result): 819 await sem.acquire() 820 result.append(2) 821 return True 822 823 async def c3(result): 824 await sem.acquire() 825 result.append(3) 826 return True 827 828 async def c4(result): 829 await sem.acquire() 830 result.append(4) 831 return True 832 833 t1 = asyncio.create_task(c1(result)) 834 t2 = asyncio.create_task(c2(result)) 835 t3 = asyncio.create_task(c3(result)) 836 837 await asyncio.sleep(0) 838 self.assertEqual([1], result) 839 self.assertTrue(sem.locked()) 840 self.assertEqual(2, len(sem._waiters)) 841 self.assertEqual(0, sem._value) 842 843 t4 = asyncio.create_task(c4(result)) 844 845 sem.release() 846 sem.release() 847 self.assertEqual(0, sem._value) 848 849 await asyncio.sleep(0) 850 self.assertEqual(0, sem._value) 851 self.assertEqual(3, len(result)) 852 self.assertTrue(sem.locked()) 853 self.assertEqual(1, len(sem._waiters)) 854 self.assertEqual(0, sem._value) 855 856 self.assertTrue(t1.done()) 857 self.assertTrue(t1.result()) 858 race_tasks = [t2, t3, t4] 859 done_tasks = [t for t in race_tasks if t.done() and t.result()] 860 self.assertEqual(2, len(done_tasks)) 861 862 # cleanup locked semaphore 863 sem.release() 864 await asyncio.gather(*race_tasks) 865 866 async def test_acquire_cancel(self): 867 sem = asyncio.Semaphore() 868 await sem.acquire() 869 870 acquire = asyncio.create_task(sem.acquire()) 871 asyncio.get_running_loop().call_soon(acquire.cancel) 872 with self.assertRaises(asyncio.CancelledError): 873 await acquire 874 self.assertTrue((not sem._waiters) or 875 all(waiter.done() for waiter in sem._waiters)) 876 877 async def test_acquire_cancel_before_awoken(self): 878 sem = asyncio.Semaphore(value=0) 879 880 t1 = asyncio.create_task(sem.acquire()) 881 t2 = asyncio.create_task(sem.acquire()) 882 t3 = asyncio.create_task(sem.acquire()) 883 t4 = asyncio.create_task(sem.acquire()) 884 885 await asyncio.sleep(0) 886 887 t1.cancel() 888 t2.cancel() 889 sem.release() 890 891 await asyncio.sleep(0) 892 await asyncio.sleep(0) 893 num_done = sum(t.done() for t in [t3, t4]) 894 self.assertEqual(num_done, 1) 895 self.assertTrue(t3.done()) 896 self.assertFalse(t4.done()) 897 898 t3.cancel() 899 t4.cancel() 900 await asyncio.sleep(0) 901 902 async def test_acquire_hang(self): 903 sem = asyncio.Semaphore(value=0) 904 905 t1 = asyncio.create_task(sem.acquire()) 906 t2 = asyncio.create_task(sem.acquire()) 907 await asyncio.sleep(0) 908 909 t1.cancel() 910 sem.release() 911 await asyncio.sleep(0) 912 await asyncio.sleep(0) 913 self.assertTrue(sem.locked()) 914 self.assertTrue(t2.done()) 915 916 async def test_acquire_no_hang(self): 917 918 sem = asyncio.Semaphore(1) 919 920 async def c1(): 921 async with sem: 922 await asyncio.sleep(0) 923 t2.cancel() 924 925 async def c2(): 926 async with sem: 927 self.assertFalse(True) 928 929 t1 = asyncio.create_task(c1()) 930 t2 = asyncio.create_task(c2()) 931 932 r1, r2 = await asyncio.gather(t1, t2, return_exceptions=True) 933 self.assertTrue(r1 is None) 934 self.assertTrue(isinstance(r2, asyncio.CancelledError)) 935 936 await asyncio.wait_for(sem.acquire(), timeout=1.0) 937 938 def test_release_not_acquired(self): 939 sem = asyncio.BoundedSemaphore() 940 941 self.assertRaises(ValueError, sem.release) 942 943 async def test_release_no_waiters(self): 944 sem = asyncio.Semaphore() 945 await sem.acquire() 946 self.assertTrue(sem.locked()) 947 948 sem.release() 949 self.assertFalse(sem.locked()) 950 951 async def test_acquire_fifo_order(self): 952 sem = asyncio.Semaphore(1) 953 result = [] 954 955 async def coro(tag): 956 await sem.acquire() 957 result.append(f'{tag}_1') 958 await asyncio.sleep(0.01) 959 sem.release() 960 961 await sem.acquire() 962 result.append(f'{tag}_2') 963 await asyncio.sleep(0.01) 964 sem.release() 965 966 async with asyncio.TaskGroup() as tg: 967 tg.create_task(coro('c1')) 968 tg.create_task(coro('c2')) 969 tg.create_task(coro('c3')) 970 971 self.assertEqual( 972 ['c1_1', 'c2_1', 'c3_1', 'c1_2', 'c2_2', 'c3_2'], 973 result 974 ) 975 976 async def test_acquire_fifo_order_2(self): 977 sem = asyncio.Semaphore(1) 978 result = [] 979 980 async def c1(result): 981 await sem.acquire() 982 result.append(1) 983 return True 984 985 async def c2(result): 986 await sem.acquire() 987 result.append(2) 988 sem.release() 989 await sem.acquire() 990 result.append(4) 991 return True 992 993 async def c3(result): 994 await sem.acquire() 995 result.append(3) 996 return True 997 998 t1 = asyncio.create_task(c1(result)) 999 t2 = asyncio.create_task(c2(result)) 1000 t3 = asyncio.create_task(c3(result)) 1001 1002 await asyncio.sleep(0) 1003 1004 sem.release() 1005 sem.release() 1006 1007 tasks = [t1, t2, t3] 1008 await asyncio.gather(*tasks) 1009 self.assertEqual([1, 2, 3, 4], result) 1010 1011 async def test_acquire_fifo_order_3(self): 1012 sem = asyncio.Semaphore(0) 1013 result = [] 1014 1015 async def c1(result): 1016 await sem.acquire() 1017 result.append(1) 1018 return True 1019 1020 async def c2(result): 1021 await sem.acquire() 1022 result.append(2) 1023 return True 1024 1025 async def c3(result): 1026 await sem.acquire() 1027 result.append(3) 1028 return True 1029 1030 t1 = asyncio.create_task(c1(result)) 1031 t2 = asyncio.create_task(c2(result)) 1032 t3 = asyncio.create_task(c3(result)) 1033 1034 await asyncio.sleep(0) 1035 1036 t1.cancel() 1037 1038 await asyncio.sleep(0) 1039 1040 sem.release() 1041 sem.release() 1042 1043 tasks = [t1, t2, t3] 1044 await asyncio.gather(*tasks, return_exceptions=True) 1045 self.assertEqual([2, 3], result) 1046 1047 1048class BarrierTests(unittest.IsolatedAsyncioTestCase): 1049 1050 async def asyncSetUp(self): 1051 await super().asyncSetUp() 1052 self.N = 5 1053 1054 def make_tasks(self, n, coro): 1055 tasks = [asyncio.create_task(coro()) for _ in range(n)] 1056 return tasks 1057 1058 async def gather_tasks(self, n, coro): 1059 tasks = self.make_tasks(n, coro) 1060 res = await asyncio.gather(*tasks) 1061 return res, tasks 1062 1063 async def test_barrier(self): 1064 barrier = asyncio.Barrier(self.N) 1065 self.assertIn("filling", repr(barrier)) 1066 with self.assertRaisesRegex( 1067 TypeError, 1068 "object Barrier can't be used in 'await' expression", 1069 ): 1070 await barrier 1071 1072 self.assertIn("filling", repr(barrier)) 1073 1074 async def test_repr(self): 1075 barrier = asyncio.Barrier(self.N) 1076 1077 self.assertTrue(RGX_REPR.match(repr(barrier))) 1078 self.assertIn("filling", repr(barrier)) 1079 1080 waiters = [] 1081 async def wait(barrier): 1082 await barrier.wait() 1083 1084 incr = 2 1085 for i in range(incr): 1086 waiters.append(asyncio.create_task(wait(barrier))) 1087 await asyncio.sleep(0) 1088 1089 self.assertTrue(RGX_REPR.match(repr(barrier))) 1090 self.assertTrue(f"waiters:{incr}/{self.N}" in repr(barrier)) 1091 self.assertIn("filling", repr(barrier)) 1092 1093 # create missing waiters 1094 for i in range(barrier.parties - barrier.n_waiting): 1095 waiters.append(asyncio.create_task(wait(barrier))) 1096 await asyncio.sleep(0) 1097 1098 self.assertTrue(RGX_REPR.match(repr(barrier))) 1099 self.assertIn("draining", repr(barrier)) 1100 1101 # add a part of waiters 1102 for i in range(incr): 1103 waiters.append(asyncio.create_task(wait(barrier))) 1104 await asyncio.sleep(0) 1105 # and reset 1106 await barrier.reset() 1107 1108 self.assertTrue(RGX_REPR.match(repr(barrier))) 1109 self.assertIn("resetting", repr(barrier)) 1110 1111 # add a part of waiters again 1112 for i in range(incr): 1113 waiters.append(asyncio.create_task(wait(barrier))) 1114 await asyncio.sleep(0) 1115 # and abort 1116 await barrier.abort() 1117 1118 self.assertTrue(RGX_REPR.match(repr(barrier))) 1119 self.assertIn("broken", repr(barrier)) 1120 self.assertTrue(barrier.broken) 1121 1122 # suppress unhandled exceptions 1123 await asyncio.gather(*waiters, return_exceptions=True) 1124 1125 async def test_barrier_parties(self): 1126 self.assertRaises(ValueError, lambda: asyncio.Barrier(0)) 1127 self.assertRaises(ValueError, lambda: asyncio.Barrier(-4)) 1128 1129 self.assertIsInstance(asyncio.Barrier(self.N), asyncio.Barrier) 1130 1131 async def test_context_manager(self): 1132 self.N = 3 1133 barrier = asyncio.Barrier(self.N) 1134 results = [] 1135 1136 async def coro(): 1137 async with barrier as i: 1138 results.append(i) 1139 1140 await self.gather_tasks(self.N, coro) 1141 1142 self.assertListEqual(sorted(results), list(range(self.N))) 1143 self.assertEqual(barrier.n_waiting, 0) 1144 self.assertFalse(barrier.broken) 1145 1146 async def test_filling_one_task(self): 1147 barrier = asyncio.Barrier(1) 1148 1149 async def f(): 1150 async with barrier as i: 1151 return True 1152 1153 ret = await f() 1154 1155 self.assertTrue(ret) 1156 self.assertEqual(barrier.n_waiting, 0) 1157 self.assertFalse(barrier.broken) 1158 1159 async def test_filling_one_task_twice(self): 1160 barrier = asyncio.Barrier(1) 1161 1162 t1 = asyncio.create_task(barrier.wait()) 1163 await asyncio.sleep(0) 1164 self.assertEqual(barrier.n_waiting, 0) 1165 1166 t2 = asyncio.create_task(barrier.wait()) 1167 await asyncio.sleep(0) 1168 1169 self.assertEqual(t1.result(), t2.result()) 1170 self.assertEqual(t1.done(), t2.done()) 1171 1172 self.assertEqual(barrier.n_waiting, 0) 1173 self.assertFalse(barrier.broken) 1174 1175 async def test_filling_task_by_task(self): 1176 self.N = 3 1177 barrier = asyncio.Barrier(self.N) 1178 1179 t1 = asyncio.create_task(barrier.wait()) 1180 await asyncio.sleep(0) 1181 self.assertEqual(barrier.n_waiting, 1) 1182 self.assertIn("filling", repr(barrier)) 1183 1184 t2 = asyncio.create_task(barrier.wait()) 1185 await asyncio.sleep(0) 1186 self.assertEqual(barrier.n_waiting, 2) 1187 self.assertIn("filling", repr(barrier)) 1188 1189 t3 = asyncio.create_task(barrier.wait()) 1190 await asyncio.sleep(0) 1191 1192 await asyncio.wait([t1, t2, t3]) 1193 1194 self.assertEqual(barrier.n_waiting, 0) 1195 self.assertFalse(barrier.broken) 1196 1197 async def test_filling_tasks_wait_twice(self): 1198 barrier = asyncio.Barrier(self.N) 1199 results = [] 1200 1201 async def coro(): 1202 async with barrier: 1203 results.append(True) 1204 1205 async with barrier: 1206 results.append(False) 1207 1208 await self.gather_tasks(self.N, coro) 1209 1210 self.assertEqual(len(results), self.N*2) 1211 self.assertEqual(results.count(True), self.N) 1212 self.assertEqual(results.count(False), self.N) 1213 1214 self.assertEqual(barrier.n_waiting, 0) 1215 self.assertFalse(barrier.broken) 1216 1217 async def test_filling_tasks_check_return_value(self): 1218 barrier = asyncio.Barrier(self.N) 1219 results1 = [] 1220 results2 = [] 1221 1222 async def coro(): 1223 async with barrier: 1224 results1.append(True) 1225 1226 async with barrier as i: 1227 results2.append(True) 1228 return i 1229 1230 res, _ = await self.gather_tasks(self.N, coro) 1231 1232 self.assertEqual(len(results1), self.N) 1233 self.assertTrue(all(results1)) 1234 self.assertEqual(len(results2), self.N) 1235 self.assertTrue(all(results2)) 1236 self.assertListEqual(sorted(res), list(range(self.N))) 1237 1238 self.assertEqual(barrier.n_waiting, 0) 1239 self.assertFalse(barrier.broken) 1240 1241 async def test_draining_state(self): 1242 barrier = asyncio.Barrier(self.N) 1243 results = [] 1244 1245 async def coro(): 1246 async with barrier: 1247 # barrier state change to filling for the last task release 1248 results.append("draining" in repr(barrier)) 1249 1250 await self.gather_tasks(self.N, coro) 1251 1252 self.assertEqual(len(results), self.N) 1253 self.assertEqual(results[-1], False) 1254 self.assertTrue(all(results[:self.N-1])) 1255 1256 self.assertEqual(barrier.n_waiting, 0) 1257 self.assertFalse(barrier.broken) 1258 1259 async def test_blocking_tasks_while_draining(self): 1260 rewait = 2 1261 barrier = asyncio.Barrier(self.N) 1262 barrier_nowaiting = asyncio.Barrier(self.N - rewait) 1263 results = [] 1264 rewait_n = rewait 1265 counter = 0 1266 1267 async def coro(): 1268 nonlocal rewait_n 1269 1270 # first time waiting 1271 await barrier.wait() 1272 1273 # after wainting once for all tasks 1274 if rewait_n > 0: 1275 rewait_n -= 1 1276 # wait again only for rewait tasks 1277 await barrier.wait() 1278 else: 1279 # wait for end of draining state` 1280 await barrier_nowaiting.wait() 1281 # wait for other waiting tasks 1282 await barrier.wait() 1283 1284 # a success means that barrier_nowaiting 1285 # was waited for exactly N-rewait=3 times 1286 await self.gather_tasks(self.N, coro) 1287 1288 async def test_filling_tasks_cancel_one(self): 1289 self.N = 3 1290 barrier = asyncio.Barrier(self.N) 1291 results = [] 1292 1293 async def coro(): 1294 await barrier.wait() 1295 results.append(True) 1296 1297 t1 = asyncio.create_task(coro()) 1298 await asyncio.sleep(0) 1299 self.assertEqual(barrier.n_waiting, 1) 1300 1301 t2 = asyncio.create_task(coro()) 1302 await asyncio.sleep(0) 1303 self.assertEqual(barrier.n_waiting, 2) 1304 1305 t1.cancel() 1306 await asyncio.sleep(0) 1307 self.assertEqual(barrier.n_waiting, 1) 1308 with self.assertRaises(asyncio.CancelledError): 1309 await t1 1310 self.assertTrue(t1.cancelled()) 1311 1312 t3 = asyncio.create_task(coro()) 1313 await asyncio.sleep(0) 1314 self.assertEqual(barrier.n_waiting, 2) 1315 1316 t4 = asyncio.create_task(coro()) 1317 await asyncio.gather(t2, t3, t4) 1318 1319 self.assertEqual(len(results), self.N) 1320 self.assertTrue(all(results)) 1321 1322 self.assertEqual(barrier.n_waiting, 0) 1323 self.assertFalse(barrier.broken) 1324 1325 async def test_reset_barrier(self): 1326 barrier = asyncio.Barrier(1) 1327 1328 asyncio.create_task(barrier.reset()) 1329 await asyncio.sleep(0) 1330 1331 self.assertEqual(barrier.n_waiting, 0) 1332 self.assertFalse(barrier.broken) 1333 1334 async def test_reset_barrier_while_tasks_waiting(self): 1335 barrier = asyncio.Barrier(self.N) 1336 results = [] 1337 1338 async def coro(): 1339 try: 1340 await barrier.wait() 1341 except asyncio.BrokenBarrierError: 1342 results.append(True) 1343 1344 async def coro_reset(): 1345 await barrier.reset() 1346 1347 # N-1 tasks waiting on barrier with N parties 1348 tasks = self.make_tasks(self.N-1, coro) 1349 await asyncio.sleep(0) 1350 1351 # reset the barrier 1352 asyncio.create_task(coro_reset()) 1353 await asyncio.gather(*tasks) 1354 1355 self.assertEqual(len(results), self.N-1) 1356 self.assertTrue(all(results)) 1357 self.assertEqual(barrier.n_waiting, 0) 1358 self.assertNotIn("resetting", repr(barrier)) 1359 self.assertFalse(barrier.broken) 1360 1361 async def test_reset_barrier_when_tasks_half_draining(self): 1362 barrier = asyncio.Barrier(self.N) 1363 results1 = [] 1364 rest_of_tasks = self.N//2 1365 1366 async def coro(): 1367 try: 1368 await barrier.wait() 1369 except asyncio.BrokenBarrierError: 1370 # catch here waiting tasks 1371 results1.append(True) 1372 else: 1373 # here drained task ouside the barrier 1374 if rest_of_tasks == barrier._count: 1375 # tasks outside the barrier 1376 await barrier.reset() 1377 1378 await self.gather_tasks(self.N, coro) 1379 1380 self.assertEqual(results1, [True]*rest_of_tasks) 1381 self.assertEqual(barrier.n_waiting, 0) 1382 self.assertNotIn("resetting", repr(barrier)) 1383 self.assertFalse(barrier.broken) 1384 1385 async def test_reset_barrier_when_tasks_half_draining_half_blocking(self): 1386 barrier = asyncio.Barrier(self.N) 1387 results1 = [] 1388 results2 = [] 1389 blocking_tasks = self.N//2 1390 count = 0 1391 1392 async def coro(): 1393 nonlocal count 1394 try: 1395 await barrier.wait() 1396 except asyncio.BrokenBarrierError: 1397 # here catch still waiting tasks 1398 results1.append(True) 1399 1400 # so now waiting again to reach nb_parties 1401 await barrier.wait() 1402 else: 1403 count += 1 1404 if count > blocking_tasks: 1405 # reset now: raise asyncio.BrokenBarrierError for waiting tasks 1406 await barrier.reset() 1407 1408 # so now waiting again to reach nb_parties 1409 await barrier.wait() 1410 else: 1411 try: 1412 await barrier.wait() 1413 except asyncio.BrokenBarrierError: 1414 # here no catch - blocked tasks go to wait 1415 results2.append(True) 1416 1417 await self.gather_tasks(self.N, coro) 1418 1419 self.assertEqual(results1, [True]*blocking_tasks) 1420 self.assertEqual(results2, []) 1421 self.assertEqual(barrier.n_waiting, 0) 1422 self.assertNotIn("resetting", repr(barrier)) 1423 self.assertFalse(barrier.broken) 1424 1425 async def test_reset_barrier_while_tasks_waiting_and_waiting_again(self): 1426 barrier = asyncio.Barrier(self.N) 1427 results1 = [] 1428 results2 = [] 1429 1430 async def coro1(): 1431 try: 1432 await barrier.wait() 1433 except asyncio.BrokenBarrierError: 1434 results1.append(True) 1435 finally: 1436 await barrier.wait() 1437 results2.append(True) 1438 1439 async def coro2(): 1440 async with barrier: 1441 results2.append(True) 1442 1443 tasks = self.make_tasks(self.N-1, coro1) 1444 1445 # reset barrier, N-1 waiting tasks raise an BrokenBarrierError 1446 asyncio.create_task(barrier.reset()) 1447 await asyncio.sleep(0) 1448 1449 # complete waiting tasks in the `finally` 1450 asyncio.create_task(coro2()) 1451 1452 await asyncio.gather(*tasks) 1453 1454 self.assertFalse(barrier.broken) 1455 self.assertEqual(len(results1), self.N-1) 1456 self.assertTrue(all(results1)) 1457 self.assertEqual(len(results2), self.N) 1458 self.assertTrue(all(results2)) 1459 1460 self.assertEqual(barrier.n_waiting, 0) 1461 1462 1463 async def test_reset_barrier_while_tasks_draining(self): 1464 barrier = asyncio.Barrier(self.N) 1465 results1 = [] 1466 results2 = [] 1467 results3 = [] 1468 count = 0 1469 1470 async def coro(): 1471 nonlocal count 1472 1473 i = await barrier.wait() 1474 count += 1 1475 if count == self.N: 1476 # last task exited from barrier 1477 await barrier.reset() 1478 1479 # wit here to reach the `parties` 1480 await barrier.wait() 1481 else: 1482 try: 1483 # second waiting 1484 await barrier.wait() 1485 1486 # N-1 tasks here 1487 results1.append(True) 1488 except Exception as e: 1489 # never goes here 1490 results2.append(True) 1491 1492 # Now, pass the barrier again 1493 # last wait, must be completed 1494 k = await barrier.wait() 1495 results3.append(True) 1496 1497 await self.gather_tasks(self.N, coro) 1498 1499 self.assertFalse(barrier.broken) 1500 self.assertTrue(all(results1)) 1501 self.assertEqual(len(results1), self.N-1) 1502 self.assertEqual(len(results2), 0) 1503 self.assertEqual(len(results3), self.N) 1504 self.assertTrue(all(results3)) 1505 1506 self.assertEqual(barrier.n_waiting, 0) 1507 1508 async def test_abort_barrier(self): 1509 barrier = asyncio.Barrier(1) 1510 1511 asyncio.create_task(barrier.abort()) 1512 await asyncio.sleep(0) 1513 1514 self.assertEqual(barrier.n_waiting, 0) 1515 self.assertTrue(barrier.broken) 1516 1517 async def test_abort_barrier_when_tasks_half_draining_half_blocking(self): 1518 barrier = asyncio.Barrier(self.N) 1519 results1 = [] 1520 results2 = [] 1521 blocking_tasks = self.N//2 1522 count = 0 1523 1524 async def coro(): 1525 nonlocal count 1526 try: 1527 await barrier.wait() 1528 except asyncio.BrokenBarrierError: 1529 # here catch tasks waiting to drain 1530 results1.append(True) 1531 else: 1532 count += 1 1533 if count > blocking_tasks: 1534 # abort now: raise asyncio.BrokenBarrierError for all tasks 1535 await barrier.abort() 1536 else: 1537 try: 1538 await barrier.wait() 1539 except asyncio.BrokenBarrierError: 1540 # here catch blocked tasks (already drained) 1541 results2.append(True) 1542 1543 await self.gather_tasks(self.N, coro) 1544 1545 self.assertTrue(barrier.broken) 1546 self.assertEqual(results1, [True]*blocking_tasks) 1547 self.assertEqual(results2, [True]*(self.N-blocking_tasks-1)) 1548 self.assertEqual(barrier.n_waiting, 0) 1549 self.assertNotIn("resetting", repr(barrier)) 1550 1551 async def test_abort_barrier_when_exception(self): 1552 # test from threading.Barrier: see `lock_tests.test_reset` 1553 barrier = asyncio.Barrier(self.N) 1554 results1 = [] 1555 results2 = [] 1556 1557 async def coro(): 1558 try: 1559 async with barrier as i : 1560 if i == self.N//2: 1561 raise RuntimeError 1562 async with barrier: 1563 results1.append(True) 1564 except asyncio.BrokenBarrierError: 1565 results2.append(True) 1566 except RuntimeError: 1567 await barrier.abort() 1568 1569 await self.gather_tasks(self.N, coro) 1570 1571 self.assertTrue(barrier.broken) 1572 self.assertEqual(len(results1), 0) 1573 self.assertEqual(len(results2), self.N-1) 1574 self.assertTrue(all(results2)) 1575 self.assertEqual(barrier.n_waiting, 0) 1576 1577 async def test_abort_barrier_when_exception_then_resetting(self): 1578 # test from threading.Barrier: see `lock_tests.test_abort_and_reset`` 1579 barrier1 = asyncio.Barrier(self.N) 1580 barrier2 = asyncio.Barrier(self.N) 1581 results1 = [] 1582 results2 = [] 1583 results3 = [] 1584 1585 async def coro(): 1586 try: 1587 i = await barrier1.wait() 1588 if i == self.N//2: 1589 raise RuntimeError 1590 await barrier1.wait() 1591 results1.append(True) 1592 except asyncio.BrokenBarrierError: 1593 results2.append(True) 1594 except RuntimeError: 1595 await barrier1.abort() 1596 1597 # Synchronize and reset the barrier. Must synchronize first so 1598 # that everyone has left it when we reset, and after so that no 1599 # one enters it before the reset. 1600 i = await barrier2.wait() 1601 if i == self.N//2: 1602 await barrier1.reset() 1603 await barrier2.wait() 1604 await barrier1.wait() 1605 results3.append(True) 1606 1607 await self.gather_tasks(self.N, coro) 1608 1609 self.assertFalse(barrier1.broken) 1610 self.assertEqual(len(results1), 0) 1611 self.assertEqual(len(results2), self.N-1) 1612 self.assertTrue(all(results2)) 1613 self.assertEqual(len(results3), self.N) 1614 self.assertTrue(all(results3)) 1615 1616 self.assertEqual(barrier1.n_waiting, 0) 1617 1618 1619if __name__ == '__main__': 1620 unittest.main() 1621