1"""Tests for locks.py"""
2
3import unittest
4from unittest import mock
5import re
6
7import asyncio
8import collections
9
10STR_RGX_REPR = (
11    r'^<(?P<class>.*?) object at (?P<address>.*?)'
12    r'\[(?P<extras>'
13    r'(set|unset|locked|unlocked|filling|draining|resetting|broken)'
14    r'(, value:\d)?'
15    r'(, waiters:\d+)?'
16    r'(, waiters:\d+\/\d+)?' # barrier
17    r')\]>\Z'
18)
19RGX_REPR = re.compile(STR_RGX_REPR)
20
21
22def tearDownModule():
23    asyncio.set_event_loop_policy(None)
24
25
26class LockTests(unittest.IsolatedAsyncioTestCase):
27
28    async def test_repr(self):
29        lock = asyncio.Lock()
30        self.assertTrue(repr(lock).endswith('[unlocked]>'))
31        self.assertTrue(RGX_REPR.match(repr(lock)))
32
33        await lock.acquire()
34        self.assertTrue(repr(lock).endswith('[locked]>'))
35        self.assertTrue(RGX_REPR.match(repr(lock)))
36
37    async def test_lock(self):
38        lock = asyncio.Lock()
39
40        with self.assertRaisesRegex(
41            TypeError,
42            "object Lock can't be used in 'await' expression"
43        ):
44            await lock
45
46        self.assertFalse(lock.locked())
47
48    async def test_lock_doesnt_accept_loop_parameter(self):
49        primitives_cls = [
50            asyncio.Lock,
51            asyncio.Condition,
52            asyncio.Event,
53            asyncio.Semaphore,
54            asyncio.BoundedSemaphore,
55        ]
56
57        loop = asyncio.get_running_loop()
58
59        for cls in primitives_cls:
60            with self.assertRaisesRegex(
61                TypeError,
62                rf"{cls.__name__}\.__init__\(\) got an unexpected "
63                rf"keyword argument 'loop'"
64            ):
65                cls(loop=loop)
66
67    async def test_lock_by_with_statement(self):
68        primitives = [
69            asyncio.Lock(),
70            asyncio.Condition(),
71            asyncio.Semaphore(),
72            asyncio.BoundedSemaphore(),
73        ]
74
75        for lock in primitives:
76            await asyncio.sleep(0.01)
77            self.assertFalse(lock.locked())
78            with self.assertRaisesRegex(
79                TypeError,
80                r"object \w+ can't be used in 'await' expression"
81            ):
82                with await lock:
83                    pass
84            self.assertFalse(lock.locked())
85
86    async def test_acquire(self):
87        lock = asyncio.Lock()
88        result = []
89
90        self.assertTrue(await lock.acquire())
91
92        async def c1(result):
93            if await lock.acquire():
94                result.append(1)
95            return True
96
97        async def c2(result):
98            if await lock.acquire():
99                result.append(2)
100            return True
101
102        async def c3(result):
103            if await lock.acquire():
104                result.append(3)
105            return True
106
107        t1 = asyncio.create_task(c1(result))
108        t2 = asyncio.create_task(c2(result))
109
110        await asyncio.sleep(0)
111        self.assertEqual([], result)
112
113        lock.release()
114        await asyncio.sleep(0)
115        self.assertEqual([1], result)
116
117        await asyncio.sleep(0)
118        self.assertEqual([1], result)
119
120        t3 = asyncio.create_task(c3(result))
121
122        lock.release()
123        await asyncio.sleep(0)
124        self.assertEqual([1, 2], result)
125
126        lock.release()
127        await asyncio.sleep(0)
128        self.assertEqual([1, 2, 3], result)
129
130        self.assertTrue(t1.done())
131        self.assertTrue(t1.result())
132        self.assertTrue(t2.done())
133        self.assertTrue(t2.result())
134        self.assertTrue(t3.done())
135        self.assertTrue(t3.result())
136
137    async def test_acquire_cancel(self):
138        lock = asyncio.Lock()
139        self.assertTrue(await lock.acquire())
140
141        task = asyncio.create_task(lock.acquire())
142        asyncio.get_running_loop().call_soon(task.cancel)
143        with self.assertRaises(asyncio.CancelledError):
144            await task
145        self.assertFalse(lock._waiters)
146
147    async def test_cancel_race(self):
148        # Several tasks:
149        # - A acquires the lock
150        # - B is blocked in acquire()
151        # - C is blocked in acquire()
152        #
153        # Now, concurrently:
154        # - B is cancelled
155        # - A releases the lock
156        #
157        # If B's waiter is marked cancelled but not yet removed from
158        # _waiters, A's release() call will crash when trying to set
159        # B's waiter; instead, it should move on to C's waiter.
160
161        # Setup: A has the lock, b and c are waiting.
162        lock = asyncio.Lock()
163
164        async def lockit(name, blocker):
165            await lock.acquire()
166            try:
167                if blocker is not None:
168                    await blocker
169            finally:
170                lock.release()
171
172        fa = asyncio.get_running_loop().create_future()
173        ta = asyncio.create_task(lockit('A', fa))
174        await asyncio.sleep(0)
175        self.assertTrue(lock.locked())
176        tb = asyncio.create_task(lockit('B', None))
177        await asyncio.sleep(0)
178        self.assertEqual(len(lock._waiters), 1)
179        tc = asyncio.create_task(lockit('C', None))
180        await asyncio.sleep(0)
181        self.assertEqual(len(lock._waiters), 2)
182
183        # Create the race and check.
184        # Without the fix this failed at the last assert.
185        fa.set_result(None)
186        tb.cancel()
187        self.assertTrue(lock._waiters[0].cancelled())
188        await asyncio.sleep(0)
189        self.assertFalse(lock.locked())
190        self.assertTrue(ta.done())
191        self.assertTrue(tb.cancelled())
192        await tc
193
194    async def test_cancel_release_race(self):
195        # Issue 32734
196        # Acquire 4 locks, cancel second, release first
197        # and 2 locks are taken at once.
198        loop = asyncio.get_running_loop()
199        lock = asyncio.Lock()
200        lock_count = 0
201        call_count = 0
202
203        async def lockit():
204            nonlocal lock_count
205            nonlocal call_count
206            call_count += 1
207            await lock.acquire()
208            lock_count += 1
209
210        def trigger():
211            t1.cancel()
212            lock.release()
213
214        await lock.acquire()
215
216        t1 = asyncio.create_task(lockit())
217        t2 = asyncio.create_task(lockit())
218        t3 = asyncio.create_task(lockit())
219
220        # Start scheduled tasks
221        await asyncio.sleep(0)
222
223        loop.call_soon(trigger)
224        with self.assertRaises(asyncio.CancelledError):
225            # Wait for cancellation
226            await t1
227
228        # Make sure only one lock was taken
229        self.assertEqual(lock_count, 1)
230        # While 3 calls were made to lockit()
231        self.assertEqual(call_count, 3)
232        self.assertTrue(t1.cancelled() and t2.done())
233
234        # Cleanup the task that is stuck on acquire.
235        t3.cancel()
236        await asyncio.sleep(0)
237        self.assertTrue(t3.cancelled())
238
239    async def test_finished_waiter_cancelled(self):
240        lock = asyncio.Lock()
241
242        await lock.acquire()
243        self.assertTrue(lock.locked())
244
245        tb = asyncio.create_task(lock.acquire())
246        await asyncio.sleep(0)
247        self.assertEqual(len(lock._waiters), 1)
248
249        # Create a second waiter, wake up the first, and cancel it.
250        # Without the fix, the second was not woken up.
251        tc = asyncio.create_task(lock.acquire())
252        tb.cancel()
253        lock.release()
254        await asyncio.sleep(0)
255
256        self.assertTrue(lock.locked())
257        self.assertTrue(tb.cancelled())
258
259        # Cleanup
260        await tc
261
262    async def test_release_not_acquired(self):
263        lock = asyncio.Lock()
264
265        self.assertRaises(RuntimeError, lock.release)
266
267    async def test_release_no_waiters(self):
268        lock = asyncio.Lock()
269        await lock.acquire()
270        self.assertTrue(lock.locked())
271
272        lock.release()
273        self.assertFalse(lock.locked())
274
275    async def test_context_manager(self):
276        lock = asyncio.Lock()
277        self.assertFalse(lock.locked())
278
279        async with lock:
280            self.assertTrue(lock.locked())
281
282        self.assertFalse(lock.locked())
283
284
285class EventTests(unittest.IsolatedAsyncioTestCase):
286
287    def test_repr(self):
288        ev = asyncio.Event()
289        self.assertTrue(repr(ev).endswith('[unset]>'))
290        match = RGX_REPR.match(repr(ev))
291        self.assertEqual(match.group('extras'), 'unset')
292
293        ev.set()
294        self.assertTrue(repr(ev).endswith('[set]>'))
295        self.assertTrue(RGX_REPR.match(repr(ev)))
296
297        ev._waiters.append(mock.Mock())
298        self.assertTrue('waiters:1' in repr(ev))
299        self.assertTrue(RGX_REPR.match(repr(ev)))
300
301    async def test_wait(self):
302        ev = asyncio.Event()
303        self.assertFalse(ev.is_set())
304
305        result = []
306
307        async def c1(result):
308            if await ev.wait():
309                result.append(1)
310
311        async def c2(result):
312            if await ev.wait():
313                result.append(2)
314
315        async def c3(result):
316            if await ev.wait():
317                result.append(3)
318
319        t1 = asyncio.create_task(c1(result))
320        t2 = asyncio.create_task(c2(result))
321
322        await asyncio.sleep(0)
323        self.assertEqual([], result)
324
325        t3 = asyncio.create_task(c3(result))
326
327        ev.set()
328        await asyncio.sleep(0)
329        self.assertEqual([3, 1, 2], result)
330
331        self.assertTrue(t1.done())
332        self.assertIsNone(t1.result())
333        self.assertTrue(t2.done())
334        self.assertIsNone(t2.result())
335        self.assertTrue(t3.done())
336        self.assertIsNone(t3.result())
337
338    async def test_wait_on_set(self):
339        ev = asyncio.Event()
340        ev.set()
341
342        res = await ev.wait()
343        self.assertTrue(res)
344
345    async def test_wait_cancel(self):
346        ev = asyncio.Event()
347
348        wait = asyncio.create_task(ev.wait())
349        asyncio.get_running_loop().call_soon(wait.cancel)
350        with self.assertRaises(asyncio.CancelledError):
351            await wait
352        self.assertFalse(ev._waiters)
353
354    async def test_clear(self):
355        ev = asyncio.Event()
356        self.assertFalse(ev.is_set())
357
358        ev.set()
359        self.assertTrue(ev.is_set())
360
361        ev.clear()
362        self.assertFalse(ev.is_set())
363
364    async def test_clear_with_waiters(self):
365        ev = asyncio.Event()
366        result = []
367
368        async def c1(result):
369            if await ev.wait():
370                result.append(1)
371            return True
372
373        t = asyncio.create_task(c1(result))
374        await asyncio.sleep(0)
375        self.assertEqual([], result)
376
377        ev.set()
378        ev.clear()
379        self.assertFalse(ev.is_set())
380
381        ev.set()
382        ev.set()
383        self.assertEqual(1, len(ev._waiters))
384
385        await asyncio.sleep(0)
386        self.assertEqual([1], result)
387        self.assertEqual(0, len(ev._waiters))
388
389        self.assertTrue(t.done())
390        self.assertTrue(t.result())
391
392
393class ConditionTests(unittest.IsolatedAsyncioTestCase):
394
395    async def test_wait(self):
396        cond = asyncio.Condition()
397        result = []
398
399        async def c1(result):
400            await cond.acquire()
401            if await cond.wait():
402                result.append(1)
403            return True
404
405        async def c2(result):
406            await cond.acquire()
407            if await cond.wait():
408                result.append(2)
409            return True
410
411        async def c3(result):
412            await cond.acquire()
413            if await cond.wait():
414                result.append(3)
415            return True
416
417        t1 = asyncio.create_task(c1(result))
418        t2 = asyncio.create_task(c2(result))
419        t3 = asyncio.create_task(c3(result))
420
421        await asyncio.sleep(0)
422        self.assertEqual([], result)
423        self.assertFalse(cond.locked())
424
425        self.assertTrue(await cond.acquire())
426        cond.notify()
427        await asyncio.sleep(0)
428        self.assertEqual([], result)
429        self.assertTrue(cond.locked())
430
431        cond.release()
432        await asyncio.sleep(0)
433        self.assertEqual([1], result)
434        self.assertTrue(cond.locked())
435
436        cond.notify(2)
437        await asyncio.sleep(0)
438        self.assertEqual([1], result)
439        self.assertTrue(cond.locked())
440
441        cond.release()
442        await asyncio.sleep(0)
443        self.assertEqual([1, 2], result)
444        self.assertTrue(cond.locked())
445
446        cond.release()
447        await asyncio.sleep(0)
448        self.assertEqual([1, 2, 3], result)
449        self.assertTrue(cond.locked())
450
451        self.assertTrue(t1.done())
452        self.assertTrue(t1.result())
453        self.assertTrue(t2.done())
454        self.assertTrue(t2.result())
455        self.assertTrue(t3.done())
456        self.assertTrue(t3.result())
457
458    async def test_wait_cancel(self):
459        cond = asyncio.Condition()
460        await cond.acquire()
461
462        wait = asyncio.create_task(cond.wait())
463        asyncio.get_running_loop().call_soon(wait.cancel)
464        with self.assertRaises(asyncio.CancelledError):
465            await wait
466        self.assertFalse(cond._waiters)
467        self.assertTrue(cond.locked())
468
469    async def test_wait_cancel_contested(self):
470        cond = asyncio.Condition()
471
472        await cond.acquire()
473        self.assertTrue(cond.locked())
474
475        wait_task = asyncio.create_task(cond.wait())
476        await asyncio.sleep(0)
477        self.assertFalse(cond.locked())
478
479        # Notify, but contest the lock before cancelling
480        await cond.acquire()
481        self.assertTrue(cond.locked())
482        cond.notify()
483        asyncio.get_running_loop().call_soon(wait_task.cancel)
484        asyncio.get_running_loop().call_soon(cond.release)
485
486        try:
487            await wait_task
488        except asyncio.CancelledError:
489            # Should not happen, since no cancellation points
490            pass
491
492        self.assertTrue(cond.locked())
493
494    async def test_wait_cancel_after_notify(self):
495        # See bpo-32841
496        waited = False
497
498        cond = asyncio.Condition()
499
500        async def wait_on_cond():
501            nonlocal waited
502            async with cond:
503                waited = True  # Make sure this area was reached
504                await cond.wait()
505
506        waiter = asyncio.create_task(wait_on_cond())
507        await asyncio.sleep(0)  # Start waiting
508
509        await cond.acquire()
510        cond.notify()
511        await asyncio.sleep(0)  # Get to acquire()
512        waiter.cancel()
513        await asyncio.sleep(0)  # Activate cancellation
514        cond.release()
515        await asyncio.sleep(0)  # Cancellation should occur
516
517        self.assertTrue(waiter.cancelled())
518        self.assertTrue(waited)
519
520    async def test_wait_unacquired(self):
521        cond = asyncio.Condition()
522        with self.assertRaises(RuntimeError):
523            await cond.wait()
524
525    async def test_wait_for(self):
526        cond = asyncio.Condition()
527        presult = False
528
529        def predicate():
530            return presult
531
532        result = []
533
534        async def c1(result):
535            await cond.acquire()
536            if await cond.wait_for(predicate):
537                result.append(1)
538                cond.release()
539            return True
540
541        t = asyncio.create_task(c1(result))
542
543        await asyncio.sleep(0)
544        self.assertEqual([], result)
545
546        await cond.acquire()
547        cond.notify()
548        cond.release()
549        await asyncio.sleep(0)
550        self.assertEqual([], result)
551
552        presult = True
553        await cond.acquire()
554        cond.notify()
555        cond.release()
556        await asyncio.sleep(0)
557        self.assertEqual([1], result)
558
559        self.assertTrue(t.done())
560        self.assertTrue(t.result())
561
562    async def test_wait_for_unacquired(self):
563        cond = asyncio.Condition()
564
565        # predicate can return true immediately
566        res = await cond.wait_for(lambda: [1, 2, 3])
567        self.assertEqual([1, 2, 3], res)
568
569        with self.assertRaises(RuntimeError):
570            await cond.wait_for(lambda: False)
571
572    async def test_notify(self):
573        cond = asyncio.Condition()
574        result = []
575
576        async def c1(result):
577            await cond.acquire()
578            if await cond.wait():
579                result.append(1)
580                cond.release()
581            return True
582
583        async def c2(result):
584            await cond.acquire()
585            if await cond.wait():
586                result.append(2)
587                cond.release()
588            return True
589
590        async def c3(result):
591            await cond.acquire()
592            if await cond.wait():
593                result.append(3)
594                cond.release()
595            return True
596
597        t1 = asyncio.create_task(c1(result))
598        t2 = asyncio.create_task(c2(result))
599        t3 = asyncio.create_task(c3(result))
600
601        await asyncio.sleep(0)
602        self.assertEqual([], result)
603
604        await cond.acquire()
605        cond.notify(1)
606        cond.release()
607        await asyncio.sleep(0)
608        self.assertEqual([1], result)
609
610        await cond.acquire()
611        cond.notify(1)
612        cond.notify(2048)
613        cond.release()
614        await asyncio.sleep(0)
615        self.assertEqual([1, 2, 3], result)
616
617        self.assertTrue(t1.done())
618        self.assertTrue(t1.result())
619        self.assertTrue(t2.done())
620        self.assertTrue(t2.result())
621        self.assertTrue(t3.done())
622        self.assertTrue(t3.result())
623
624    async def test_notify_all(self):
625        cond = asyncio.Condition()
626
627        result = []
628
629        async def c1(result):
630            await cond.acquire()
631            if await cond.wait():
632                result.append(1)
633                cond.release()
634            return True
635
636        async def c2(result):
637            await cond.acquire()
638            if await cond.wait():
639                result.append(2)
640                cond.release()
641            return True
642
643        t1 = asyncio.create_task(c1(result))
644        t2 = asyncio.create_task(c2(result))
645
646        await asyncio.sleep(0)
647        self.assertEqual([], result)
648
649        await cond.acquire()
650        cond.notify_all()
651        cond.release()
652        await asyncio.sleep(0)
653        self.assertEqual([1, 2], result)
654
655        self.assertTrue(t1.done())
656        self.assertTrue(t1.result())
657        self.assertTrue(t2.done())
658        self.assertTrue(t2.result())
659
660    def test_notify_unacquired(self):
661        cond = asyncio.Condition()
662        self.assertRaises(RuntimeError, cond.notify)
663
664    def test_notify_all_unacquired(self):
665        cond = asyncio.Condition()
666        self.assertRaises(RuntimeError, cond.notify_all)
667
668    async def test_repr(self):
669        cond = asyncio.Condition()
670        self.assertTrue('unlocked' in repr(cond))
671        self.assertTrue(RGX_REPR.match(repr(cond)))
672
673        await cond.acquire()
674        self.assertTrue('locked' in repr(cond))
675
676        cond._waiters.append(mock.Mock())
677        self.assertTrue('waiters:1' in repr(cond))
678        self.assertTrue(RGX_REPR.match(repr(cond)))
679
680        cond._waiters.append(mock.Mock())
681        self.assertTrue('waiters:2' in repr(cond))
682        self.assertTrue(RGX_REPR.match(repr(cond)))
683
684    async def test_context_manager(self):
685        cond = asyncio.Condition()
686        self.assertFalse(cond.locked())
687        async with cond:
688            self.assertTrue(cond.locked())
689        self.assertFalse(cond.locked())
690
691    async def test_explicit_lock(self):
692        async def f(lock=None, cond=None):
693            if lock is None:
694                lock = asyncio.Lock()
695            if cond is None:
696                cond = asyncio.Condition(lock)
697            self.assertIs(cond._lock, lock)
698            self.assertFalse(lock.locked())
699            self.assertFalse(cond.locked())
700            async with cond:
701                self.assertTrue(lock.locked())
702                self.assertTrue(cond.locked())
703            self.assertFalse(lock.locked())
704            self.assertFalse(cond.locked())
705            async with lock:
706                self.assertTrue(lock.locked())
707                self.assertTrue(cond.locked())
708            self.assertFalse(lock.locked())
709            self.assertFalse(cond.locked())
710
711        # All should work in the same way.
712        await f()
713        await f(asyncio.Lock())
714        lock = asyncio.Lock()
715        await f(lock, asyncio.Condition(lock))
716
717    async def test_ambiguous_loops(self):
718        loop = asyncio.new_event_loop()
719        self.addCleanup(loop.close)
720
721        async def wrong_loop_in_lock():
722            with self.assertRaises(TypeError):
723                asyncio.Lock(loop=loop)  # actively disallowed since 3.10
724            lock = asyncio.Lock()
725            lock._loop = loop  # use private API for testing
726            async with lock:
727                # acquired immediately via the fast-path
728                # without interaction with any event loop.
729                cond = asyncio.Condition(lock)
730                # cond.acquire() will trigger waiting on the lock
731                # and it will discover the event loop mismatch.
732                with self.assertRaisesRegex(
733                    RuntimeError,
734                    "is bound to a different event loop",
735                ):
736                    await cond.acquire()
737
738        async def wrong_loop_in_cond():
739            # Same analogy here with the condition's loop.
740            lock = asyncio.Lock()
741            async with lock:
742                with self.assertRaises(TypeError):
743                    asyncio.Condition(lock, loop=loop)
744                cond = asyncio.Condition(lock)
745                cond._loop = loop
746                with self.assertRaisesRegex(
747                    RuntimeError,
748                    "is bound to a different event loop",
749                ):
750                    await cond.wait()
751
752        await wrong_loop_in_lock()
753        await wrong_loop_in_cond()
754
755    async def test_timeout_in_block(self):
756        condition = asyncio.Condition()
757        async with condition:
758            with self.assertRaises(asyncio.TimeoutError):
759                await asyncio.wait_for(condition.wait(), timeout=0.5)
760
761
762class SemaphoreTests(unittest.IsolatedAsyncioTestCase):
763
764    def test_initial_value_zero(self):
765        sem = asyncio.Semaphore(0)
766        self.assertTrue(sem.locked())
767
768    async def test_repr(self):
769        sem = asyncio.Semaphore()
770        self.assertTrue(repr(sem).endswith('[unlocked, value:1]>'))
771        self.assertTrue(RGX_REPR.match(repr(sem)))
772
773        await sem.acquire()
774        self.assertTrue(repr(sem).endswith('[locked]>'))
775        self.assertTrue('waiters' not in repr(sem))
776        self.assertTrue(RGX_REPR.match(repr(sem)))
777
778        if sem._waiters is None:
779            sem._waiters = collections.deque()
780
781        sem._waiters.append(mock.Mock())
782        self.assertTrue('waiters:1' in repr(sem))
783        self.assertTrue(RGX_REPR.match(repr(sem)))
784
785        sem._waiters.append(mock.Mock())
786        self.assertTrue('waiters:2' in repr(sem))
787        self.assertTrue(RGX_REPR.match(repr(sem)))
788
789    async def test_semaphore(self):
790        sem = asyncio.Semaphore()
791        self.assertEqual(1, sem._value)
792
793        with self.assertRaisesRegex(
794            TypeError,
795            "object Semaphore can't be used in 'await' expression",
796        ):
797            await sem
798
799        self.assertFalse(sem.locked())
800        self.assertEqual(1, sem._value)
801
802    def test_semaphore_value(self):
803        self.assertRaises(ValueError, asyncio.Semaphore, -1)
804
805    async def test_acquire(self):
806        sem = asyncio.Semaphore(3)
807        result = []
808
809        self.assertTrue(await sem.acquire())
810        self.assertTrue(await sem.acquire())
811        self.assertFalse(sem.locked())
812
813        async def c1(result):
814            await sem.acquire()
815            result.append(1)
816            return True
817
818        async def c2(result):
819            await sem.acquire()
820            result.append(2)
821            return True
822
823        async def c3(result):
824            await sem.acquire()
825            result.append(3)
826            return True
827
828        async def c4(result):
829            await sem.acquire()
830            result.append(4)
831            return True
832
833        t1 = asyncio.create_task(c1(result))
834        t2 = asyncio.create_task(c2(result))
835        t3 = asyncio.create_task(c3(result))
836
837        await asyncio.sleep(0)
838        self.assertEqual([1], result)
839        self.assertTrue(sem.locked())
840        self.assertEqual(2, len(sem._waiters))
841        self.assertEqual(0, sem._value)
842
843        t4 = asyncio.create_task(c4(result))
844
845        sem.release()
846        sem.release()
847        self.assertEqual(0, sem._value)
848
849        await asyncio.sleep(0)
850        self.assertEqual(0, sem._value)
851        self.assertEqual(3, len(result))
852        self.assertTrue(sem.locked())
853        self.assertEqual(1, len(sem._waiters))
854        self.assertEqual(0, sem._value)
855
856        self.assertTrue(t1.done())
857        self.assertTrue(t1.result())
858        race_tasks = [t2, t3, t4]
859        done_tasks = [t for t in race_tasks if t.done() and t.result()]
860        self.assertEqual(2, len(done_tasks))
861
862        # cleanup locked semaphore
863        sem.release()
864        await asyncio.gather(*race_tasks)
865
866    async def test_acquire_cancel(self):
867        sem = asyncio.Semaphore()
868        await sem.acquire()
869
870        acquire = asyncio.create_task(sem.acquire())
871        asyncio.get_running_loop().call_soon(acquire.cancel)
872        with self.assertRaises(asyncio.CancelledError):
873            await acquire
874        self.assertTrue((not sem._waiters) or
875                        all(waiter.done() for waiter in sem._waiters))
876
877    async def test_acquire_cancel_before_awoken(self):
878        sem = asyncio.Semaphore(value=0)
879
880        t1 = asyncio.create_task(sem.acquire())
881        t2 = asyncio.create_task(sem.acquire())
882        t3 = asyncio.create_task(sem.acquire())
883        t4 = asyncio.create_task(sem.acquire())
884
885        await asyncio.sleep(0)
886
887        t1.cancel()
888        t2.cancel()
889        sem.release()
890
891        await asyncio.sleep(0)
892        await asyncio.sleep(0)
893        num_done = sum(t.done() for t in [t3, t4])
894        self.assertEqual(num_done, 1)
895        self.assertTrue(t3.done())
896        self.assertFalse(t4.done())
897
898        t3.cancel()
899        t4.cancel()
900        await asyncio.sleep(0)
901
902    async def test_acquire_hang(self):
903        sem = asyncio.Semaphore(value=0)
904
905        t1 = asyncio.create_task(sem.acquire())
906        t2 = asyncio.create_task(sem.acquire())
907        await asyncio.sleep(0)
908
909        t1.cancel()
910        sem.release()
911        await asyncio.sleep(0)
912        await asyncio.sleep(0)
913        self.assertTrue(sem.locked())
914        self.assertTrue(t2.done())
915
916    async def test_acquire_no_hang(self):
917
918        sem = asyncio.Semaphore(1)
919
920        async def c1():
921            async with sem:
922                await asyncio.sleep(0)
923            t2.cancel()
924
925        async def c2():
926            async with sem:
927                self.assertFalse(True)
928
929        t1 = asyncio.create_task(c1())
930        t2 = asyncio.create_task(c2())
931
932        r1, r2 = await asyncio.gather(t1, t2, return_exceptions=True)
933        self.assertTrue(r1 is None)
934        self.assertTrue(isinstance(r2, asyncio.CancelledError))
935
936        await asyncio.wait_for(sem.acquire(), timeout=1.0)
937
938    def test_release_not_acquired(self):
939        sem = asyncio.BoundedSemaphore()
940
941        self.assertRaises(ValueError, sem.release)
942
943    async def test_release_no_waiters(self):
944        sem = asyncio.Semaphore()
945        await sem.acquire()
946        self.assertTrue(sem.locked())
947
948        sem.release()
949        self.assertFalse(sem.locked())
950
951    async def test_acquire_fifo_order(self):
952        sem = asyncio.Semaphore(1)
953        result = []
954
955        async def coro(tag):
956            await sem.acquire()
957            result.append(f'{tag}_1')
958            await asyncio.sleep(0.01)
959            sem.release()
960
961            await sem.acquire()
962            result.append(f'{tag}_2')
963            await asyncio.sleep(0.01)
964            sem.release()
965
966        async with asyncio.TaskGroup() as tg:
967            tg.create_task(coro('c1'))
968            tg.create_task(coro('c2'))
969            tg.create_task(coro('c3'))
970
971        self.assertEqual(
972            ['c1_1', 'c2_1', 'c3_1', 'c1_2', 'c2_2', 'c3_2'],
973            result
974        )
975
976    async def test_acquire_fifo_order_2(self):
977        sem = asyncio.Semaphore(1)
978        result = []
979
980        async def c1(result):
981            await sem.acquire()
982            result.append(1)
983            return True
984
985        async def c2(result):
986            await sem.acquire()
987            result.append(2)
988            sem.release()
989            await sem.acquire()
990            result.append(4)
991            return True
992
993        async def c3(result):
994            await sem.acquire()
995            result.append(3)
996            return True
997
998        t1 = asyncio.create_task(c1(result))
999        t2 = asyncio.create_task(c2(result))
1000        t3 = asyncio.create_task(c3(result))
1001
1002        await asyncio.sleep(0)
1003
1004        sem.release()
1005        sem.release()
1006
1007        tasks = [t1, t2, t3]
1008        await asyncio.gather(*tasks)
1009        self.assertEqual([1, 2, 3, 4], result)
1010
1011    async def test_acquire_fifo_order_3(self):
1012        sem = asyncio.Semaphore(0)
1013        result = []
1014
1015        async def c1(result):
1016            await sem.acquire()
1017            result.append(1)
1018            return True
1019
1020        async def c2(result):
1021            await sem.acquire()
1022            result.append(2)
1023            return True
1024
1025        async def c3(result):
1026            await sem.acquire()
1027            result.append(3)
1028            return True
1029
1030        t1 = asyncio.create_task(c1(result))
1031        t2 = asyncio.create_task(c2(result))
1032        t3 = asyncio.create_task(c3(result))
1033
1034        await asyncio.sleep(0)
1035
1036        t1.cancel()
1037
1038        await asyncio.sleep(0)
1039
1040        sem.release()
1041        sem.release()
1042
1043        tasks = [t1, t2, t3]
1044        await asyncio.gather(*tasks, return_exceptions=True)
1045        self.assertEqual([2, 3], result)
1046
1047
1048class BarrierTests(unittest.IsolatedAsyncioTestCase):
1049
1050    async def asyncSetUp(self):
1051        await super().asyncSetUp()
1052        self.N = 5
1053
1054    def make_tasks(self, n, coro):
1055        tasks = [asyncio.create_task(coro()) for _ in range(n)]
1056        return tasks
1057
1058    async def gather_tasks(self, n, coro):
1059        tasks = self.make_tasks(n, coro)
1060        res = await asyncio.gather(*tasks)
1061        return res, tasks
1062
1063    async def test_barrier(self):
1064        barrier = asyncio.Barrier(self.N)
1065        self.assertIn("filling", repr(barrier))
1066        with self.assertRaisesRegex(
1067            TypeError,
1068            "object Barrier can't be used in 'await' expression",
1069        ):
1070            await barrier
1071
1072        self.assertIn("filling", repr(barrier))
1073
1074    async def test_repr(self):
1075        barrier = asyncio.Barrier(self.N)
1076
1077        self.assertTrue(RGX_REPR.match(repr(barrier)))
1078        self.assertIn("filling", repr(barrier))
1079
1080        waiters = []
1081        async def wait(barrier):
1082            await barrier.wait()
1083
1084        incr = 2
1085        for i in range(incr):
1086            waiters.append(asyncio.create_task(wait(barrier)))
1087        await asyncio.sleep(0)
1088
1089        self.assertTrue(RGX_REPR.match(repr(barrier)))
1090        self.assertTrue(f"waiters:{incr}/{self.N}" in repr(barrier))
1091        self.assertIn("filling", repr(barrier))
1092
1093        # create missing waiters
1094        for i in range(barrier.parties - barrier.n_waiting):
1095            waiters.append(asyncio.create_task(wait(barrier)))
1096        await asyncio.sleep(0)
1097
1098        self.assertTrue(RGX_REPR.match(repr(barrier)))
1099        self.assertIn("draining", repr(barrier))
1100
1101        # add a part of waiters
1102        for i in range(incr):
1103            waiters.append(asyncio.create_task(wait(barrier)))
1104        await asyncio.sleep(0)
1105        # and reset
1106        await barrier.reset()
1107
1108        self.assertTrue(RGX_REPR.match(repr(barrier)))
1109        self.assertIn("resetting", repr(barrier))
1110
1111        # add a part of waiters again
1112        for i in range(incr):
1113            waiters.append(asyncio.create_task(wait(barrier)))
1114        await asyncio.sleep(0)
1115        # and abort
1116        await barrier.abort()
1117
1118        self.assertTrue(RGX_REPR.match(repr(barrier)))
1119        self.assertIn("broken", repr(barrier))
1120        self.assertTrue(barrier.broken)
1121
1122        # suppress unhandled exceptions
1123        await asyncio.gather(*waiters, return_exceptions=True)
1124
1125    async def test_barrier_parties(self):
1126        self.assertRaises(ValueError, lambda: asyncio.Barrier(0))
1127        self.assertRaises(ValueError, lambda: asyncio.Barrier(-4))
1128
1129        self.assertIsInstance(asyncio.Barrier(self.N), asyncio.Barrier)
1130
1131    async def test_context_manager(self):
1132        self.N = 3
1133        barrier = asyncio.Barrier(self.N)
1134        results = []
1135
1136        async def coro():
1137            async with barrier as i:
1138                results.append(i)
1139
1140        await self.gather_tasks(self.N, coro)
1141
1142        self.assertListEqual(sorted(results), list(range(self.N)))
1143        self.assertEqual(barrier.n_waiting, 0)
1144        self.assertFalse(barrier.broken)
1145
1146    async def test_filling_one_task(self):
1147        barrier = asyncio.Barrier(1)
1148
1149        async def f():
1150            async with barrier as i:
1151                return True
1152
1153        ret = await f()
1154
1155        self.assertTrue(ret)
1156        self.assertEqual(barrier.n_waiting, 0)
1157        self.assertFalse(barrier.broken)
1158
1159    async def test_filling_one_task_twice(self):
1160        barrier = asyncio.Barrier(1)
1161
1162        t1 = asyncio.create_task(barrier.wait())
1163        await asyncio.sleep(0)
1164        self.assertEqual(barrier.n_waiting, 0)
1165
1166        t2 = asyncio.create_task(barrier.wait())
1167        await asyncio.sleep(0)
1168
1169        self.assertEqual(t1.result(), t2.result())
1170        self.assertEqual(t1.done(), t2.done())
1171
1172        self.assertEqual(barrier.n_waiting, 0)
1173        self.assertFalse(barrier.broken)
1174
1175    async def test_filling_task_by_task(self):
1176        self.N = 3
1177        barrier = asyncio.Barrier(self.N)
1178
1179        t1 = asyncio.create_task(barrier.wait())
1180        await asyncio.sleep(0)
1181        self.assertEqual(barrier.n_waiting, 1)
1182        self.assertIn("filling", repr(barrier))
1183
1184        t2 = asyncio.create_task(barrier.wait())
1185        await asyncio.sleep(0)
1186        self.assertEqual(barrier.n_waiting, 2)
1187        self.assertIn("filling", repr(barrier))
1188
1189        t3 = asyncio.create_task(barrier.wait())
1190        await asyncio.sleep(0)
1191
1192        await asyncio.wait([t1, t2, t3])
1193
1194        self.assertEqual(barrier.n_waiting, 0)
1195        self.assertFalse(barrier.broken)
1196
1197    async def test_filling_tasks_wait_twice(self):
1198        barrier = asyncio.Barrier(self.N)
1199        results = []
1200
1201        async def coro():
1202            async with barrier:
1203                results.append(True)
1204
1205                async with barrier:
1206                    results.append(False)
1207
1208        await self.gather_tasks(self.N, coro)
1209
1210        self.assertEqual(len(results), self.N*2)
1211        self.assertEqual(results.count(True), self.N)
1212        self.assertEqual(results.count(False), self.N)
1213
1214        self.assertEqual(barrier.n_waiting, 0)
1215        self.assertFalse(barrier.broken)
1216
1217    async def test_filling_tasks_check_return_value(self):
1218        barrier = asyncio.Barrier(self.N)
1219        results1 = []
1220        results2 = []
1221
1222        async def coro():
1223            async with barrier:
1224                results1.append(True)
1225
1226                async with barrier as i:
1227                    results2.append(True)
1228                    return i
1229
1230        res, _ = await self.gather_tasks(self.N, coro)
1231
1232        self.assertEqual(len(results1), self.N)
1233        self.assertTrue(all(results1))
1234        self.assertEqual(len(results2), self.N)
1235        self.assertTrue(all(results2))
1236        self.assertListEqual(sorted(res), list(range(self.N)))
1237
1238        self.assertEqual(barrier.n_waiting, 0)
1239        self.assertFalse(barrier.broken)
1240
1241    async def test_draining_state(self):
1242        barrier = asyncio.Barrier(self.N)
1243        results = []
1244
1245        async def coro():
1246            async with barrier:
1247                # barrier state change to filling for the last task release
1248                results.append("draining" in repr(barrier))
1249
1250        await self.gather_tasks(self.N, coro)
1251
1252        self.assertEqual(len(results), self.N)
1253        self.assertEqual(results[-1], False)
1254        self.assertTrue(all(results[:self.N-1]))
1255
1256        self.assertEqual(barrier.n_waiting, 0)
1257        self.assertFalse(barrier.broken)
1258
1259    async def test_blocking_tasks_while_draining(self):
1260        rewait = 2
1261        barrier = asyncio.Barrier(self.N)
1262        barrier_nowaiting = asyncio.Barrier(self.N - rewait)
1263        results = []
1264        rewait_n = rewait
1265        counter = 0
1266
1267        async def coro():
1268            nonlocal rewait_n
1269
1270            # first time waiting
1271            await barrier.wait()
1272
1273            # after wainting once for all tasks
1274            if rewait_n > 0:
1275                rewait_n -= 1
1276                # wait again only for rewait tasks
1277                await barrier.wait()
1278            else:
1279                # wait for end of draining state`
1280                await barrier_nowaiting.wait()
1281                # wait for other waiting tasks
1282                await barrier.wait()
1283
1284        # a success means that barrier_nowaiting
1285        # was waited for exactly N-rewait=3 times
1286        await self.gather_tasks(self.N, coro)
1287
1288    async def test_filling_tasks_cancel_one(self):
1289        self.N = 3
1290        barrier = asyncio.Barrier(self.N)
1291        results = []
1292
1293        async def coro():
1294            await barrier.wait()
1295            results.append(True)
1296
1297        t1 = asyncio.create_task(coro())
1298        await asyncio.sleep(0)
1299        self.assertEqual(barrier.n_waiting, 1)
1300
1301        t2 = asyncio.create_task(coro())
1302        await asyncio.sleep(0)
1303        self.assertEqual(barrier.n_waiting, 2)
1304
1305        t1.cancel()
1306        await asyncio.sleep(0)
1307        self.assertEqual(barrier.n_waiting, 1)
1308        with self.assertRaises(asyncio.CancelledError):
1309            await t1
1310        self.assertTrue(t1.cancelled())
1311
1312        t3 = asyncio.create_task(coro())
1313        await asyncio.sleep(0)
1314        self.assertEqual(barrier.n_waiting, 2)
1315
1316        t4 = asyncio.create_task(coro())
1317        await asyncio.gather(t2, t3, t4)
1318
1319        self.assertEqual(len(results), self.N)
1320        self.assertTrue(all(results))
1321
1322        self.assertEqual(barrier.n_waiting, 0)
1323        self.assertFalse(barrier.broken)
1324
1325    async def test_reset_barrier(self):
1326        barrier = asyncio.Barrier(1)
1327
1328        asyncio.create_task(barrier.reset())
1329        await asyncio.sleep(0)
1330
1331        self.assertEqual(barrier.n_waiting, 0)
1332        self.assertFalse(barrier.broken)
1333
1334    async def test_reset_barrier_while_tasks_waiting(self):
1335        barrier = asyncio.Barrier(self.N)
1336        results = []
1337
1338        async def coro():
1339            try:
1340                await barrier.wait()
1341            except asyncio.BrokenBarrierError:
1342                results.append(True)
1343
1344        async def coro_reset():
1345            await barrier.reset()
1346
1347        # N-1 tasks waiting on barrier with N parties
1348        tasks  = self.make_tasks(self.N-1, coro)
1349        await asyncio.sleep(0)
1350
1351        # reset the barrier
1352        asyncio.create_task(coro_reset())
1353        await asyncio.gather(*tasks)
1354
1355        self.assertEqual(len(results), self.N-1)
1356        self.assertTrue(all(results))
1357        self.assertEqual(barrier.n_waiting, 0)
1358        self.assertNotIn("resetting", repr(barrier))
1359        self.assertFalse(barrier.broken)
1360
1361    async def test_reset_barrier_when_tasks_half_draining(self):
1362        barrier = asyncio.Barrier(self.N)
1363        results1 = []
1364        rest_of_tasks = self.N//2
1365
1366        async def coro():
1367            try:
1368                await barrier.wait()
1369            except asyncio.BrokenBarrierError:
1370                # catch here waiting tasks
1371                results1.append(True)
1372            else:
1373                # here drained task ouside the barrier
1374                if rest_of_tasks == barrier._count:
1375                    # tasks outside the barrier
1376                    await barrier.reset()
1377
1378        await self.gather_tasks(self.N, coro)
1379
1380        self.assertEqual(results1, [True]*rest_of_tasks)
1381        self.assertEqual(barrier.n_waiting, 0)
1382        self.assertNotIn("resetting", repr(barrier))
1383        self.assertFalse(barrier.broken)
1384
1385    async def test_reset_barrier_when_tasks_half_draining_half_blocking(self):
1386        barrier = asyncio.Barrier(self.N)
1387        results1 = []
1388        results2 = []
1389        blocking_tasks = self.N//2
1390        count = 0
1391
1392        async def coro():
1393            nonlocal count
1394            try:
1395                await barrier.wait()
1396            except asyncio.BrokenBarrierError:
1397                # here catch still waiting tasks
1398                results1.append(True)
1399
1400                # so now waiting again to reach nb_parties
1401                await barrier.wait()
1402            else:
1403                count += 1
1404                if count > blocking_tasks:
1405                    # reset now: raise asyncio.BrokenBarrierError for waiting tasks
1406                    await barrier.reset()
1407
1408                    # so now waiting again to reach nb_parties
1409                    await barrier.wait()
1410                else:
1411                    try:
1412                        await barrier.wait()
1413                    except asyncio.BrokenBarrierError:
1414                        # here no catch - blocked tasks go to wait
1415                        results2.append(True)
1416
1417        await self.gather_tasks(self.N, coro)
1418
1419        self.assertEqual(results1, [True]*blocking_tasks)
1420        self.assertEqual(results2, [])
1421        self.assertEqual(barrier.n_waiting, 0)
1422        self.assertNotIn("resetting", repr(barrier))
1423        self.assertFalse(barrier.broken)
1424
1425    async def test_reset_barrier_while_tasks_waiting_and_waiting_again(self):
1426        barrier = asyncio.Barrier(self.N)
1427        results1 = []
1428        results2 = []
1429
1430        async def coro1():
1431            try:
1432                await barrier.wait()
1433            except asyncio.BrokenBarrierError:
1434                results1.append(True)
1435            finally:
1436                await barrier.wait()
1437                results2.append(True)
1438
1439        async def coro2():
1440            async with barrier:
1441                results2.append(True)
1442
1443        tasks = self.make_tasks(self.N-1, coro1)
1444
1445        # reset barrier, N-1 waiting tasks raise an BrokenBarrierError
1446        asyncio.create_task(barrier.reset())
1447        await asyncio.sleep(0)
1448
1449        # complete waiting tasks in the `finally`
1450        asyncio.create_task(coro2())
1451
1452        await asyncio.gather(*tasks)
1453
1454        self.assertFalse(barrier.broken)
1455        self.assertEqual(len(results1), self.N-1)
1456        self.assertTrue(all(results1))
1457        self.assertEqual(len(results2), self.N)
1458        self.assertTrue(all(results2))
1459
1460        self.assertEqual(barrier.n_waiting, 0)
1461
1462
1463    async def test_reset_barrier_while_tasks_draining(self):
1464        barrier = asyncio.Barrier(self.N)
1465        results1 = []
1466        results2 = []
1467        results3 = []
1468        count = 0
1469
1470        async def coro():
1471            nonlocal count
1472
1473            i = await barrier.wait()
1474            count += 1
1475            if count == self.N:
1476                # last task exited from barrier
1477                await barrier.reset()
1478
1479                # wit here to reach the `parties`
1480                await barrier.wait()
1481            else:
1482                try:
1483                    # second waiting
1484                    await barrier.wait()
1485
1486                    # N-1 tasks here
1487                    results1.append(True)
1488                except Exception as e:
1489                    # never goes here
1490                    results2.append(True)
1491
1492            # Now, pass the barrier again
1493            # last wait, must be completed
1494            k = await barrier.wait()
1495            results3.append(True)
1496
1497        await self.gather_tasks(self.N, coro)
1498
1499        self.assertFalse(barrier.broken)
1500        self.assertTrue(all(results1))
1501        self.assertEqual(len(results1), self.N-1)
1502        self.assertEqual(len(results2), 0)
1503        self.assertEqual(len(results3), self.N)
1504        self.assertTrue(all(results3))
1505
1506        self.assertEqual(barrier.n_waiting, 0)
1507
1508    async def test_abort_barrier(self):
1509        barrier = asyncio.Barrier(1)
1510
1511        asyncio.create_task(barrier.abort())
1512        await asyncio.sleep(0)
1513
1514        self.assertEqual(barrier.n_waiting, 0)
1515        self.assertTrue(barrier.broken)
1516
1517    async def test_abort_barrier_when_tasks_half_draining_half_blocking(self):
1518        barrier = asyncio.Barrier(self.N)
1519        results1 = []
1520        results2 = []
1521        blocking_tasks = self.N//2
1522        count = 0
1523
1524        async def coro():
1525            nonlocal count
1526            try:
1527                await barrier.wait()
1528            except asyncio.BrokenBarrierError:
1529                # here catch tasks waiting to drain
1530                results1.append(True)
1531            else:
1532                count += 1
1533                if count > blocking_tasks:
1534                    # abort now: raise asyncio.BrokenBarrierError for all tasks
1535                    await barrier.abort()
1536                else:
1537                    try:
1538                        await barrier.wait()
1539                    except asyncio.BrokenBarrierError:
1540                        # here catch blocked tasks (already drained)
1541                        results2.append(True)
1542
1543        await self.gather_tasks(self.N, coro)
1544
1545        self.assertTrue(barrier.broken)
1546        self.assertEqual(results1, [True]*blocking_tasks)
1547        self.assertEqual(results2, [True]*(self.N-blocking_tasks-1))
1548        self.assertEqual(barrier.n_waiting, 0)
1549        self.assertNotIn("resetting", repr(barrier))
1550
1551    async def test_abort_barrier_when_exception(self):
1552        # test from threading.Barrier: see `lock_tests.test_reset`
1553        barrier = asyncio.Barrier(self.N)
1554        results1 = []
1555        results2 = []
1556
1557        async def coro():
1558            try:
1559                async with barrier as i :
1560                    if i == self.N//2:
1561                        raise RuntimeError
1562                async with barrier:
1563                    results1.append(True)
1564            except asyncio.BrokenBarrierError:
1565                results2.append(True)
1566            except RuntimeError:
1567                await barrier.abort()
1568
1569        await self.gather_tasks(self.N, coro)
1570
1571        self.assertTrue(barrier.broken)
1572        self.assertEqual(len(results1), 0)
1573        self.assertEqual(len(results2), self.N-1)
1574        self.assertTrue(all(results2))
1575        self.assertEqual(barrier.n_waiting, 0)
1576
1577    async def test_abort_barrier_when_exception_then_resetting(self):
1578        # test from threading.Barrier: see `lock_tests.test_abort_and_reset``
1579        barrier1 = asyncio.Barrier(self.N)
1580        barrier2 = asyncio.Barrier(self.N)
1581        results1 = []
1582        results2 = []
1583        results3 = []
1584
1585        async def coro():
1586            try:
1587                i = await barrier1.wait()
1588                if i == self.N//2:
1589                    raise RuntimeError
1590                await barrier1.wait()
1591                results1.append(True)
1592            except asyncio.BrokenBarrierError:
1593                results2.append(True)
1594            except RuntimeError:
1595                await barrier1.abort()
1596
1597            # Synchronize and reset the barrier.  Must synchronize first so
1598            # that everyone has left it when we reset, and after so that no
1599            # one enters it before the reset.
1600            i = await barrier2.wait()
1601            if  i == self.N//2:
1602                await barrier1.reset()
1603            await barrier2.wait()
1604            await barrier1.wait()
1605            results3.append(True)
1606
1607        await self.gather_tasks(self.N, coro)
1608
1609        self.assertFalse(barrier1.broken)
1610        self.assertEqual(len(results1), 0)
1611        self.assertEqual(len(results2), self.N-1)
1612        self.assertTrue(all(results2))
1613        self.assertEqual(len(results3), self.N)
1614        self.assertTrue(all(results3))
1615
1616        self.assertEqual(barrier1.n_waiting, 0)
1617
1618
1619if __name__ == '__main__':
1620    unittest.main()
1621