1"""Tests for tasks.py."""
2
3import collections
4import contextlib
5import contextvars
6import functools
7import gc
8import io
9import random
10import re
11import sys
12import textwrap
13import traceback
14import unittest
15from unittest import mock
16from types import GenericAlias
17
18import asyncio
19from asyncio import coroutines
20from asyncio import futures
21from asyncio import tasks
22from test.test_asyncio import utils as test_utils
23from test import support
24from test.support.script_helper import assert_python_ok
25
26
27def tearDownModule():
28    asyncio.set_event_loop_policy(None)
29
30
31async def coroutine_function():
32    pass
33
34
35def format_coroutine(qualname, state, src, source_traceback, generator=False):
36    if generator:
37        state = '%s' % state
38    else:
39        state = '%s, defined' % state
40    if source_traceback is not None:
41        frame = source_traceback[-1]
42        return ('coro=<%s() %s at %s> created at %s:%s'
43                % (qualname, state, src, frame[0], frame[1]))
44    else:
45        return 'coro=<%s() %s at %s>' % (qualname, state, src)
46
47
48def get_innermost_context(exc):
49    """
50    Return information about the innermost exception context in the chain.
51    """
52    depth = 0
53    while True:
54        context = exc.__context__
55        if context is None:
56            break
57
58        exc = context
59        depth += 1
60
61    return (type(exc), exc.args, depth)
62
63
64class Dummy:
65
66    def __repr__(self):
67        return '<Dummy>'
68
69    def __call__(self, *args):
70        pass
71
72
73class CoroLikeObject:
74    def send(self, v):
75        raise StopIteration(42)
76
77    def throw(self, *exc):
78        pass
79
80    def close(self):
81        pass
82
83    def __await__(self):
84        return self
85
86
87class BaseTaskTests:
88
89    Task = None
90    Future = None
91
92    def new_task(self, loop, coro, name='TestTask', context=None):
93        return self.__class__.Task(coro, loop=loop, name=name, context=context)
94
95    def new_future(self, loop):
96        return self.__class__.Future(loop=loop)
97
98    def setUp(self):
99        super().setUp()
100        self.loop = self.new_test_loop()
101        self.loop.set_task_factory(self.new_task)
102        self.loop.create_future = lambda: self.new_future(self.loop)
103
104    def test_generic_alias(self):
105        task = self.__class__.Task[str]
106        self.assertEqual(task.__args__, (str,))
107        self.assertIsInstance(task, GenericAlias)
108
109    def test_task_cancel_message_getter(self):
110        async def coro():
111            pass
112        t = self.new_task(self.loop, coro())
113        self.assertTrue(hasattr(t, '_cancel_message'))
114        self.assertEqual(t._cancel_message, None)
115
116        t.cancel('my message')
117        self.assertEqual(t._cancel_message, 'my message')
118
119        with self.assertRaises(asyncio.CancelledError) as cm:
120            self.loop.run_until_complete(t)
121
122        self.assertEqual('my message', cm.exception.args[0])
123
124    def test_task_cancel_message_setter(self):
125        async def coro():
126            pass
127        t = self.new_task(self.loop, coro())
128        t.cancel('my message')
129        t._cancel_message = 'my new message'
130        self.assertEqual(t._cancel_message, 'my new message')
131
132        with self.assertRaises(asyncio.CancelledError) as cm:
133            self.loop.run_until_complete(t)
134
135        self.assertEqual('my new message', cm.exception.args[0])
136
137    def test_task_del_collect(self):
138        class Evil:
139            def __del__(self):
140                gc.collect()
141
142        async def run():
143            return Evil()
144
145        self.loop.run_until_complete(
146            asyncio.gather(*[
147                self.new_task(self.loop, run()) for _ in range(100)
148            ]))
149
150    def test_other_loop_future(self):
151        other_loop = asyncio.new_event_loop()
152        fut = self.new_future(other_loop)
153
154        async def run(fut):
155            await fut
156
157        try:
158            with self.assertRaisesRegex(RuntimeError,
159                                        r'Task .* got Future .* attached'):
160                self.loop.run_until_complete(run(fut))
161        finally:
162            other_loop.close()
163
164    def test_task_awaits_on_itself(self):
165
166        async def test():
167            await task
168
169        task = asyncio.ensure_future(test(), loop=self.loop)
170
171        with self.assertRaisesRegex(RuntimeError,
172                                    'Task cannot await on itself'):
173            self.loop.run_until_complete(task)
174
175    def test_task_class(self):
176        async def notmuch():
177            return 'ok'
178        t = self.new_task(self.loop, notmuch())
179        self.loop.run_until_complete(t)
180        self.assertTrue(t.done())
181        self.assertEqual(t.result(), 'ok')
182        self.assertIs(t._loop, self.loop)
183        self.assertIs(t.get_loop(), self.loop)
184
185        loop = asyncio.new_event_loop()
186        self.set_event_loop(loop)
187        t = self.new_task(loop, notmuch())
188        self.assertIs(t._loop, loop)
189        loop.run_until_complete(t)
190        loop.close()
191
192    def test_ensure_future_coroutine(self):
193        async def notmuch():
194            return 'ok'
195        t = asyncio.ensure_future(notmuch(), loop=self.loop)
196        self.assertIs(t._loop, self.loop)
197        self.loop.run_until_complete(t)
198        self.assertTrue(t.done())
199        self.assertEqual(t.result(), 'ok')
200
201        a = notmuch()
202        self.addCleanup(a.close)
203        with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
204            asyncio.ensure_future(a)
205
206        async def test():
207            return asyncio.ensure_future(notmuch())
208        t = self.loop.run_until_complete(test())
209        self.assertIs(t._loop, self.loop)
210        self.loop.run_until_complete(t)
211        self.assertTrue(t.done())
212        self.assertEqual(t.result(), 'ok')
213
214        # Deprecated in 3.10, undeprecated in 3.11.1
215        asyncio.set_event_loop(self.loop)
216        self.addCleanup(asyncio.set_event_loop, None)
217        t = asyncio.ensure_future(notmuch())
218        self.assertIs(t._loop, self.loop)
219        self.loop.run_until_complete(t)
220        self.assertTrue(t.done())
221        self.assertEqual(t.result(), 'ok')
222
223    def test_ensure_future_future(self):
224        f_orig = self.new_future(self.loop)
225        f_orig.set_result('ko')
226
227        f = asyncio.ensure_future(f_orig)
228        self.loop.run_until_complete(f)
229        self.assertTrue(f.done())
230        self.assertEqual(f.result(), 'ko')
231        self.assertIs(f, f_orig)
232
233        loop = asyncio.new_event_loop()
234        self.set_event_loop(loop)
235
236        with self.assertRaises(ValueError):
237            f = asyncio.ensure_future(f_orig, loop=loop)
238
239        loop.close()
240
241        f = asyncio.ensure_future(f_orig, loop=self.loop)
242        self.assertIs(f, f_orig)
243
244    def test_ensure_future_task(self):
245        async def notmuch():
246            return 'ok'
247        t_orig = self.new_task(self.loop, notmuch())
248        t = asyncio.ensure_future(t_orig)
249        self.loop.run_until_complete(t)
250        self.assertTrue(t.done())
251        self.assertEqual(t.result(), 'ok')
252        self.assertIs(t, t_orig)
253
254        loop = asyncio.new_event_loop()
255        self.set_event_loop(loop)
256
257        with self.assertRaises(ValueError):
258            t = asyncio.ensure_future(t_orig, loop=loop)
259
260        loop.close()
261
262        t = asyncio.ensure_future(t_orig, loop=self.loop)
263        self.assertIs(t, t_orig)
264
265    def test_ensure_future_awaitable(self):
266        class Aw:
267            def __init__(self, coro):
268                self.coro = coro
269            def __await__(self):
270                return self.coro.__await__()
271
272        async def coro():
273            return 'ok'
274
275        loop = asyncio.new_event_loop()
276        self.set_event_loop(loop)
277        fut = asyncio.ensure_future(Aw(coro()), loop=loop)
278        loop.run_until_complete(fut)
279        self.assertEqual(fut.result(), 'ok')
280
281    def test_ensure_future_neither(self):
282        with self.assertRaises(TypeError):
283            asyncio.ensure_future('ok')
284
285    def test_ensure_future_error_msg(self):
286        loop = asyncio.new_event_loop()
287        f = self.new_future(self.loop)
288        with self.assertRaisesRegex(ValueError, 'The future belongs to a '
289                                    'different loop than the one specified as '
290                                    'the loop argument'):
291            asyncio.ensure_future(f, loop=loop)
292        loop.close()
293
294    def test_get_stack(self):
295        T = None
296
297        async def foo():
298            await bar()
299
300        async def bar():
301            # test get_stack()
302            f = T.get_stack(limit=1)
303            try:
304                self.assertEqual(f[0].f_code.co_name, 'foo')
305            finally:
306                f = None
307
308            # test print_stack()
309            file = io.StringIO()
310            T.print_stack(limit=1, file=file)
311            file.seek(0)
312            tb = file.read()
313            self.assertRegex(tb, r'foo\(\) running')
314
315        async def runner():
316            nonlocal T
317            T = asyncio.ensure_future(foo(), loop=self.loop)
318            await T
319
320        self.loop.run_until_complete(runner())
321
322    def test_task_repr(self):
323        self.loop.set_debug(False)
324
325        async def notmuch():
326            return 'abc'
327
328        # test coroutine function
329        self.assertEqual(notmuch.__name__, 'notmuch')
330        self.assertRegex(notmuch.__qualname__,
331                         r'\w+.test_task_repr.<locals>.notmuch')
332        self.assertEqual(notmuch.__module__, __name__)
333
334        filename, lineno = test_utils.get_function_source(notmuch)
335        src = "%s:%s" % (filename, lineno)
336
337        # test coroutine object
338        gen = notmuch()
339        coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
340        self.assertEqual(gen.__name__, 'notmuch')
341        self.assertEqual(gen.__qualname__, coro_qualname)
342
343        # test pending Task
344        t = self.new_task(self.loop, gen)
345        t.add_done_callback(Dummy())
346
347        coro = format_coroutine(coro_qualname, 'running', src,
348                                t._source_traceback, generator=True)
349        self.assertEqual(repr(t),
350                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
351
352        # test cancelling Task
353        t.cancel()  # Does not take immediate effect!
354        self.assertEqual(repr(t),
355                         "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro)
356
357        # test cancelled Task
358        self.assertRaises(asyncio.CancelledError,
359                          self.loop.run_until_complete, t)
360        coro = format_coroutine(coro_qualname, 'done', src,
361                                t._source_traceback)
362        self.assertEqual(repr(t),
363                         "<Task cancelled name='TestTask' %s>" % coro)
364
365        # test finished Task
366        t = self.new_task(self.loop, notmuch())
367        self.loop.run_until_complete(t)
368        coro = format_coroutine(coro_qualname, 'done', src,
369                                t._source_traceback)
370        self.assertEqual(repr(t),
371                         "<Task finished name='TestTask' %s result='abc'>" % coro)
372
373    def test_task_repr_autogenerated(self):
374        async def notmuch():
375            return 123
376
377        t1 = self.new_task(self.loop, notmuch(), None)
378        t2 = self.new_task(self.loop, notmuch(), None)
379        self.assertNotEqual(repr(t1), repr(t2))
380
381        match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1))
382        self.assertIsNotNone(match1)
383        match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2))
384        self.assertIsNotNone(match2)
385
386        # Autogenerated task names should have monotonically increasing numbers
387        self.assertLess(int(match1.group(1)), int(match2.group(1)))
388        self.loop.run_until_complete(t1)
389        self.loop.run_until_complete(t2)
390
391    def test_task_repr_name_not_str(self):
392        async def notmuch():
393            return 123
394
395        t = self.new_task(self.loop, notmuch())
396        t.set_name({6})
397        self.assertEqual(t.get_name(), '{6}')
398        self.loop.run_until_complete(t)
399
400    def test_task_repr_wait_for(self):
401        self.loop.set_debug(False)
402
403        async def wait_for(fut):
404            return await fut
405
406        fut = self.new_future(self.loop)
407        task = self.new_task(self.loop, wait_for(fut))
408        test_utils.run_briefly(self.loop)
409        self.assertRegex(repr(task),
410                         '<Task .* wait_for=%s>' % re.escape(repr(fut)))
411
412        fut.set_result(None)
413        self.loop.run_until_complete(task)
414
415    def test_task_basics(self):
416
417        async def outer():
418            a = await inner1()
419            b = await inner2()
420            return a+b
421
422        async def inner1():
423            return 42
424
425        async def inner2():
426            return 1000
427
428        t = outer()
429        self.assertEqual(self.loop.run_until_complete(t), 1042)
430
431    def test_exception_chaining_after_await(self):
432        # Test that when awaiting on a task when an exception is already
433        # active, if the task raises an exception it will be chained
434        # with the original.
435        loop = asyncio.new_event_loop()
436        self.set_event_loop(loop)
437
438        async def raise_error():
439            raise ValueError
440
441        async def run():
442            try:
443                raise KeyError(3)
444            except Exception as exc:
445                task = self.new_task(loop, raise_error())
446                try:
447                    await task
448                except Exception as exc:
449                    self.assertEqual(type(exc), ValueError)
450                    chained = exc.__context__
451                    self.assertEqual((type(chained), chained.args),
452                        (KeyError, (3,)))
453
454        try:
455            task = self.new_task(loop, run())
456            loop.run_until_complete(task)
457        finally:
458            loop.close()
459
460    def test_exception_chaining_after_await_with_context_cycle(self):
461        # Check trying to create an exception context cycle:
462        # https://bugs.python.org/issue40696
463        has_cycle = None
464        loop = asyncio.new_event_loop()
465        self.set_event_loop(loop)
466
467        async def process_exc(exc):
468            raise exc
469
470        async def run():
471            nonlocal has_cycle
472            try:
473                raise KeyError('a')
474            except Exception as exc:
475                task = self.new_task(loop, process_exc(exc))
476                try:
477                    await task
478                except BaseException as exc:
479                    has_cycle = (exc is exc.__context__)
480                    # Prevent a hang if has_cycle is True.
481                    exc.__context__ = None
482
483        try:
484            task = self.new_task(loop, run())
485            loop.run_until_complete(task)
486        finally:
487            loop.close()
488        # This also distinguishes from the initial has_cycle=None.
489        self.assertEqual(has_cycle, False)
490
491
492    def test_cancelling(self):
493        loop = asyncio.new_event_loop()
494
495        async def task():
496            await asyncio.sleep(10)
497
498        try:
499            t = self.new_task(loop, task())
500            self.assertFalse(t.cancelling())
501            self.assertNotIn(" cancelling ", repr(t))
502            self.assertTrue(t.cancel())
503            self.assertTrue(t.cancelling())
504            self.assertIn(" cancelling ", repr(t))
505
506            # Since we commented out two lines from Task.cancel(),
507            # this t.cancel() call now returns True.
508            # self.assertFalse(t.cancel())
509            self.assertTrue(t.cancel())
510
511            with self.assertRaises(asyncio.CancelledError):
512                loop.run_until_complete(t)
513        finally:
514            loop.close()
515
516    def test_uncancel_basic(self):
517        loop = asyncio.new_event_loop()
518
519        async def task():
520            try:
521                await asyncio.sleep(10)
522            except asyncio.CancelledError:
523                asyncio.current_task().uncancel()
524                await asyncio.sleep(10)
525
526        try:
527            t = self.new_task(loop, task())
528            loop.run_until_complete(asyncio.sleep(0.01))
529
530            # Cancel first sleep
531            self.assertTrue(t.cancel())
532            self.assertIn(" cancelling ", repr(t))
533            self.assertEqual(t.cancelling(), 1)
534            self.assertFalse(t.cancelled())  # Task is still not complete
535            loop.run_until_complete(asyncio.sleep(0.01))
536
537            # after .uncancel()
538            self.assertNotIn(" cancelling ", repr(t))
539            self.assertEqual(t.cancelling(), 0)
540            self.assertFalse(t.cancelled())  # Task is still not complete
541
542            # Cancel second sleep
543            self.assertTrue(t.cancel())
544            self.assertEqual(t.cancelling(), 1)
545            self.assertFalse(t.cancelled())  # Task is still not complete
546            with self.assertRaises(asyncio.CancelledError):
547                loop.run_until_complete(t)
548            self.assertTrue(t.cancelled())  # Finally, task complete
549            self.assertTrue(t.done())
550
551            # uncancel is no longer effective after the task is complete
552            t.uncancel()
553            self.assertTrue(t.cancelled())
554            self.assertTrue(t.done())
555        finally:
556            loop.close()
557
558    def test_uncancel_structured_blocks(self):
559        # This test recreates the following high-level structure using uncancel()::
560        #
561        #     async def make_request_with_timeout():
562        #         try:
563        #             async with asyncio.timeout(1):
564        #                 # Structured block affected by the timeout:
565        #                 await make_request()
566        #                 await make_another_request()
567        #         except TimeoutError:
568        #             pass  # There was a timeout
569        #         # Outer code not affected by the timeout:
570        #         await unrelated_code()
571
572        loop = asyncio.new_event_loop()
573
574        async def make_request_with_timeout(*, sleep: float, timeout: float):
575            task = asyncio.current_task()
576            loop = task.get_loop()
577
578            timed_out = False
579            structured_block_finished = False
580            outer_code_reached = False
581
582            def on_timeout():
583                nonlocal timed_out
584                timed_out = True
585                task.cancel()
586
587            timeout_handle = loop.call_later(timeout, on_timeout)
588            try:
589                try:
590                    # Structured block affected by the timeout
591                    await asyncio.sleep(sleep)
592                    structured_block_finished = True
593                finally:
594                    timeout_handle.cancel()
595                    if (
596                        timed_out
597                        and task.uncancel() == 0
598                        and sys.exc_info()[0] is asyncio.CancelledError
599                    ):
600                        # Note the five rules that are needed here to satisfy proper
601                        # uncancellation:
602                        #
603                        # 1. handle uncancellation in a `finally:` block to allow for
604                        #    plain returns;
605                        # 2. our `timed_out` flag is set, meaning that it was our event
606                        #    that triggered the need to uncancel the task, regardless of
607                        #    what exception is raised;
608                        # 3. we can call `uncancel()` because *we* called `cancel()`
609                        #    before;
610                        # 4. we call `uncancel()` but we only continue converting the
611                        #    CancelledError to TimeoutError if `uncancel()` caused the
612                        #    cancellation request count go down to 0.  We need to look
613                        #    at the counter vs having a simple boolean flag because our
614                        #    code might have been nested (think multiple timeouts). See
615                        #    commit 7fce1063b6e5a366f8504e039a8ccdd6944625cd for
616                        #    details.
617                        # 5. we only convert CancelledError to TimeoutError; for other
618                        #    exceptions raised due to the cancellation (like
619                        #    a ConnectionLostError from a database client), simply
620                        #    propagate them.
621                        #
622                        # Those checks need to take place in this exact order to make
623                        # sure the `cancelling()` counter always stays in sync.
624                        #
625                        # Additionally, the original stimulus to `cancel()` the task
626                        # needs to be unscheduled to avoid re-cancelling the task later.
627                        # Here we do it by cancelling `timeout_handle` in the `finally:`
628                        # block.
629                        raise TimeoutError
630            except TimeoutError:
631                self.assertTrue(timed_out)
632
633            # Outer code not affected by the timeout:
634            outer_code_reached = True
635            await asyncio.sleep(0)
636            return timed_out, structured_block_finished, outer_code_reached
637
638        try:
639            # Test which timed out.
640            t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
641            timed_out, structured_block_finished, outer_code_reached = (
642                loop.run_until_complete(t1)
643            )
644            self.assertTrue(timed_out)
645            self.assertFalse(structured_block_finished)  # it was cancelled
646            self.assertTrue(outer_code_reached)  # task got uncancelled after leaving
647                                                 # the structured block and continued until
648                                                 # completion
649            self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
650
651            # Test which did not time out.
652            t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
653            timed_out, structured_block_finished, outer_code_reached = (
654                loop.run_until_complete(t2)
655            )
656            self.assertFalse(timed_out)
657            self.assertTrue(structured_block_finished)
658            self.assertTrue(outer_code_reached)
659            self.assertEqual(t2.cancelling(), 0)
660        finally:
661            loop.close()
662
663    def test_cancel(self):
664
665        def gen():
666            when = yield
667            self.assertAlmostEqual(10.0, when)
668            yield 0
669
670        loop = self.new_test_loop(gen)
671
672        async def task():
673            await asyncio.sleep(10.0)
674            return 12
675
676        t = self.new_task(loop, task())
677        loop.call_soon(t.cancel)
678        with self.assertRaises(asyncio.CancelledError):
679            loop.run_until_complete(t)
680        self.assertTrue(t.done())
681        self.assertTrue(t.cancelled())
682        self.assertFalse(t.cancel())
683
684    def test_cancel_with_message_then_future_result(self):
685        # Test Future.result() after calling cancel() with a message.
686        cases = [
687            ((), ()),
688            ((None,), ()),
689            (('my message',), ('my message',)),
690            # Non-string values should roundtrip.
691            ((5,), (5,)),
692        ]
693        for cancel_args, expected_args in cases:
694            with self.subTest(cancel_args=cancel_args):
695                loop = asyncio.new_event_loop()
696                self.set_event_loop(loop)
697
698                async def sleep():
699                    await asyncio.sleep(10)
700
701                async def coro():
702                    task = self.new_task(loop, sleep())
703                    await asyncio.sleep(0)
704                    task.cancel(*cancel_args)
705                    done, pending = await asyncio.wait([task])
706                    task.result()
707
708                task = self.new_task(loop, coro())
709                with self.assertRaises(asyncio.CancelledError) as cm:
710                    loop.run_until_complete(task)
711                exc = cm.exception
712                self.assertEqual(exc.args, expected_args)
713
714                actual = get_innermost_context(exc)
715                self.assertEqual(actual,
716                    (asyncio.CancelledError, expected_args, 0))
717
718    def test_cancel_with_message_then_future_exception(self):
719        # Test Future.exception() after calling cancel() with a message.
720        cases = [
721            ((), ()),
722            ((None,), ()),
723            (('my message',), ('my message',)),
724            # Non-string values should roundtrip.
725            ((5,), (5,)),
726        ]
727        for cancel_args, expected_args in cases:
728            with self.subTest(cancel_args=cancel_args):
729                loop = asyncio.new_event_loop()
730                self.set_event_loop(loop)
731
732                async def sleep():
733                    await asyncio.sleep(10)
734
735                async def coro():
736                    task = self.new_task(loop, sleep())
737                    await asyncio.sleep(0)
738                    task.cancel(*cancel_args)
739                    done, pending = await asyncio.wait([task])
740                    task.exception()
741
742                task = self.new_task(loop, coro())
743                with self.assertRaises(asyncio.CancelledError) as cm:
744                    loop.run_until_complete(task)
745                exc = cm.exception
746                self.assertEqual(exc.args, expected_args)
747
748                actual = get_innermost_context(exc)
749                self.assertEqual(actual,
750                    (asyncio.CancelledError, expected_args, 0))
751
752    def test_cancellation_exception_context(self):
753        loop = asyncio.new_event_loop()
754        self.set_event_loop(loop)
755        fut = loop.create_future()
756
757        async def sleep():
758            fut.set_result(None)
759            await asyncio.sleep(10)
760
761        async def coro():
762            inner_task = self.new_task(loop, sleep())
763            await fut
764            loop.call_soon(inner_task.cancel, 'msg')
765            try:
766                await inner_task
767            except asyncio.CancelledError as ex:
768                raise ValueError("cancelled") from ex
769
770        task = self.new_task(loop, coro())
771        with self.assertRaises(ValueError) as cm:
772            loop.run_until_complete(task)
773        exc = cm.exception
774        self.assertEqual(exc.args, ('cancelled',))
775
776        actual = get_innermost_context(exc)
777        self.assertEqual(actual,
778            (asyncio.CancelledError, ('msg',), 1))
779
780    def test_cancel_with_message_before_starting_task(self):
781        loop = asyncio.new_event_loop()
782        self.set_event_loop(loop)
783
784        async def sleep():
785            await asyncio.sleep(10)
786
787        async def coro():
788            task = self.new_task(loop, sleep())
789            # We deliberately leave out the sleep here.
790            task.cancel('my message')
791            done, pending = await asyncio.wait([task])
792            task.exception()
793
794        task = self.new_task(loop, coro())
795        with self.assertRaises(asyncio.CancelledError) as cm:
796            loop.run_until_complete(task)
797        exc = cm.exception
798        self.assertEqual(exc.args, ('my message',))
799
800        actual = get_innermost_context(exc)
801        self.assertEqual(actual,
802            (asyncio.CancelledError, ('my message',), 0))
803
804    def test_cancel_yield(self):
805        async def task():
806            await asyncio.sleep(0)
807            await asyncio.sleep(0)
808            return 12
809
810        t = self.new_task(self.loop, task())
811        test_utils.run_briefly(self.loop)  # start coro
812        t.cancel()
813        self.assertRaises(
814            asyncio.CancelledError, self.loop.run_until_complete, t)
815        self.assertTrue(t.done())
816        self.assertTrue(t.cancelled())
817        self.assertFalse(t.cancel())
818
819    def test_cancel_inner_future(self):
820        f = self.new_future(self.loop)
821
822        async def task():
823            await f
824            return 12
825
826        t = self.new_task(self.loop, task())
827        test_utils.run_briefly(self.loop)  # start task
828        f.cancel()
829        with self.assertRaises(asyncio.CancelledError):
830            self.loop.run_until_complete(t)
831        self.assertTrue(f.cancelled())
832        self.assertTrue(t.cancelled())
833
834    def test_cancel_both_task_and_inner_future(self):
835        f = self.new_future(self.loop)
836
837        async def task():
838            await f
839            return 12
840
841        t = self.new_task(self.loop, task())
842        test_utils.run_briefly(self.loop)
843
844        f.cancel()
845        t.cancel()
846
847        with self.assertRaises(asyncio.CancelledError):
848            self.loop.run_until_complete(t)
849
850        self.assertTrue(t.done())
851        self.assertTrue(f.cancelled())
852        self.assertTrue(t.cancelled())
853
854    def test_cancel_task_catching(self):
855        fut1 = self.new_future(self.loop)
856        fut2 = self.new_future(self.loop)
857
858        async def task():
859            await fut1
860            try:
861                await fut2
862            except asyncio.CancelledError:
863                return 42
864
865        t = self.new_task(self.loop, task())
866        test_utils.run_briefly(self.loop)
867        self.assertIs(t._fut_waiter, fut1)  # White-box test.
868        fut1.set_result(None)
869        test_utils.run_briefly(self.loop)
870        self.assertIs(t._fut_waiter, fut2)  # White-box test.
871        t.cancel()
872        self.assertTrue(fut2.cancelled())
873        res = self.loop.run_until_complete(t)
874        self.assertEqual(res, 42)
875        self.assertFalse(t.cancelled())
876
877    def test_cancel_task_ignoring(self):
878        fut1 = self.new_future(self.loop)
879        fut2 = self.new_future(self.loop)
880        fut3 = self.new_future(self.loop)
881
882        async def task():
883            await fut1
884            try:
885                await fut2
886            except asyncio.CancelledError:
887                pass
888            res = await fut3
889            return res
890
891        t = self.new_task(self.loop, task())
892        test_utils.run_briefly(self.loop)
893        self.assertIs(t._fut_waiter, fut1)  # White-box test.
894        fut1.set_result(None)
895        test_utils.run_briefly(self.loop)
896        self.assertIs(t._fut_waiter, fut2)  # White-box test.
897        t.cancel()
898        self.assertTrue(fut2.cancelled())
899        test_utils.run_briefly(self.loop)
900        self.assertIs(t._fut_waiter, fut3)  # White-box test.
901        fut3.set_result(42)
902        res = self.loop.run_until_complete(t)
903        self.assertEqual(res, 42)
904        self.assertFalse(fut3.cancelled())
905        self.assertFalse(t.cancelled())
906
907    def test_cancel_current_task(self):
908        loop = asyncio.new_event_loop()
909        self.set_event_loop(loop)
910
911        async def task():
912            t.cancel()
913            self.assertTrue(t._must_cancel)  # White-box test.
914            # The sleep should be cancelled immediately.
915            await asyncio.sleep(100)
916            return 12
917
918        t = self.new_task(loop, task())
919        self.assertFalse(t.cancelled())
920        self.assertRaises(
921            asyncio.CancelledError, loop.run_until_complete, t)
922        self.assertTrue(t.done())
923        self.assertTrue(t.cancelled())
924        self.assertFalse(t._must_cancel)  # White-box test.
925        self.assertFalse(t.cancel())
926
927    def test_cancel_at_end(self):
928        """coroutine end right after task is cancelled"""
929        loop = asyncio.new_event_loop()
930        self.set_event_loop(loop)
931
932        async def task():
933            t.cancel()
934            self.assertTrue(t._must_cancel)  # White-box test.
935            return 12
936
937        t = self.new_task(loop, task())
938        self.assertFalse(t.cancelled())
939        self.assertRaises(
940            asyncio.CancelledError, loop.run_until_complete, t)
941        self.assertTrue(t.done())
942        self.assertTrue(t.cancelled())
943        self.assertFalse(t._must_cancel)  # White-box test.
944        self.assertFalse(t.cancel())
945
946    def test_cancel_awaited_task(self):
947        # This tests for a relatively rare condition when
948        # a task cancellation is requested for a task which is not
949        # currently blocked, such as a task cancelling itself.
950        # In this situation we must ensure that whatever next future
951        # or task the cancelled task blocks on is cancelled correctly
952        # as well.  See also bpo-34872.
953        loop = asyncio.new_event_loop()
954        self.addCleanup(lambda: loop.close())
955
956        task = nested_task = None
957        fut = self.new_future(loop)
958
959        async def nested():
960            await fut
961
962        async def coro():
963            nonlocal nested_task
964            # Create a sub-task and wait for it to run.
965            nested_task = self.new_task(loop, nested())
966            await asyncio.sleep(0)
967
968            # Request the current task to be cancelled.
969            task.cancel()
970            # Block on the nested task, which should be immediately
971            # cancelled.
972            await nested_task
973
974        task = self.new_task(loop, coro())
975        with self.assertRaises(asyncio.CancelledError):
976            loop.run_until_complete(task)
977
978        self.assertTrue(task.cancelled())
979        self.assertTrue(nested_task.cancelled())
980        self.assertTrue(fut.cancelled())
981
982    def assert_text_contains(self, text, substr):
983        if substr not in text:
984            raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<')
985
986    def test_cancel_traceback_for_future_result(self):
987        # When calling Future.result() on a cancelled task, check that the
988        # line of code that was interrupted is included in the traceback.
989        loop = asyncio.new_event_loop()
990        self.set_event_loop(loop)
991
992        async def nested():
993            # This will get cancelled immediately.
994            await asyncio.sleep(10)
995
996        async def coro():
997            task = self.new_task(loop, nested())
998            await asyncio.sleep(0)
999            task.cancel()
1000            await task  # search target
1001
1002        task = self.new_task(loop, coro())
1003        try:
1004            loop.run_until_complete(task)
1005        except asyncio.CancelledError:
1006            tb = traceback.format_exc()
1007            self.assert_text_contains(tb, "await asyncio.sleep(10)")
1008            # The intermediate await should also be included.
1009            self.assert_text_contains(tb, "await task  # search target")
1010        else:
1011            self.fail('CancelledError did not occur')
1012
1013    def test_cancel_traceback_for_future_exception(self):
1014        # When calling Future.exception() on a cancelled task, check that the
1015        # line of code that was interrupted is included in the traceback.
1016        loop = asyncio.new_event_loop()
1017        self.set_event_loop(loop)
1018
1019        async def nested():
1020            # This will get cancelled immediately.
1021            await asyncio.sleep(10)
1022
1023        async def coro():
1024            task = self.new_task(loop, nested())
1025            await asyncio.sleep(0)
1026            task.cancel()
1027            done, pending = await asyncio.wait([task])
1028            task.exception()  # search target
1029
1030        task = self.new_task(loop, coro())
1031        try:
1032            loop.run_until_complete(task)
1033        except asyncio.CancelledError:
1034            tb = traceback.format_exc()
1035            self.assert_text_contains(tb, "await asyncio.sleep(10)")
1036            # The intermediate await should also be included.
1037            self.assert_text_contains(tb,
1038                "task.exception()  # search target")
1039        else:
1040            self.fail('CancelledError did not occur')
1041
1042    def test_stop_while_run_in_complete(self):
1043
1044        def gen():
1045            when = yield
1046            self.assertAlmostEqual(0.1, when)
1047            when = yield 0.1
1048            self.assertAlmostEqual(0.2, when)
1049            when = yield 0.1
1050            self.assertAlmostEqual(0.3, when)
1051            yield 0.1
1052
1053        loop = self.new_test_loop(gen)
1054
1055        x = 0
1056
1057        async def task():
1058            nonlocal x
1059            while x < 10:
1060                await asyncio.sleep(0.1)
1061                x += 1
1062                if x == 2:
1063                    loop.stop()
1064
1065        t = self.new_task(loop, task())
1066        with self.assertRaises(RuntimeError) as cm:
1067            loop.run_until_complete(t)
1068        self.assertEqual(str(cm.exception),
1069                         'Event loop stopped before Future completed.')
1070        self.assertFalse(t.done())
1071        self.assertEqual(x, 2)
1072        self.assertAlmostEqual(0.3, loop.time())
1073
1074        t.cancel()
1075        self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
1076
1077    def test_log_traceback(self):
1078        async def coro():
1079            pass
1080
1081        task = self.new_task(self.loop, coro())
1082        with self.assertRaisesRegex(ValueError, 'can only be set to False'):
1083            task._log_traceback = True
1084        self.loop.run_until_complete(task)
1085
1086    def test_wait(self):
1087
1088        def gen():
1089            when = yield
1090            self.assertAlmostEqual(0.1, when)
1091            when = yield 0
1092            self.assertAlmostEqual(0.15, when)
1093            yield 0.15
1094
1095        loop = self.new_test_loop(gen)
1096
1097        a = self.new_task(loop, asyncio.sleep(0.1))
1098        b = self.new_task(loop, asyncio.sleep(0.15))
1099
1100        async def foo():
1101            done, pending = await asyncio.wait([b, a])
1102            self.assertEqual(done, set([a, b]))
1103            self.assertEqual(pending, set())
1104            return 42
1105
1106        res = loop.run_until_complete(self.new_task(loop, foo()))
1107        self.assertEqual(res, 42)
1108        self.assertAlmostEqual(0.15, loop.time())
1109
1110        # Doing it again should take no time and exercise a different path.
1111        res = loop.run_until_complete(self.new_task(loop, foo()))
1112        self.assertAlmostEqual(0.15, loop.time())
1113        self.assertEqual(res, 42)
1114
1115    def test_wait_duplicate_coroutines(self):
1116
1117        async def coro(s):
1118            return s
1119        c = self.loop.create_task(coro('test'))
1120        task = self.new_task(
1121            self.loop,
1122            asyncio.wait([c, c, self.loop.create_task(coro('spam'))]))
1123
1124        done, pending = self.loop.run_until_complete(task)
1125
1126        self.assertFalse(pending)
1127        self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
1128
1129    def test_wait_errors(self):
1130        self.assertRaises(
1131            ValueError, self.loop.run_until_complete,
1132            asyncio.wait(set()))
1133
1134        # -1 is an invalid return_when value
1135        sleep_coro = asyncio.sleep(10.0)
1136        wait_coro = asyncio.wait([sleep_coro], return_when=-1)
1137        self.assertRaises(ValueError,
1138                          self.loop.run_until_complete, wait_coro)
1139
1140        sleep_coro.close()
1141
1142    def test_wait_first_completed(self):
1143
1144        def gen():
1145            when = yield
1146            self.assertAlmostEqual(10.0, when)
1147            when = yield 0
1148            self.assertAlmostEqual(0.1, when)
1149            yield 0.1
1150
1151        loop = self.new_test_loop(gen)
1152
1153        a = self.new_task(loop, asyncio.sleep(10.0))
1154        b = self.new_task(loop, asyncio.sleep(0.1))
1155        task = self.new_task(
1156            loop,
1157            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1158
1159        done, pending = loop.run_until_complete(task)
1160        self.assertEqual({b}, done)
1161        self.assertEqual({a}, pending)
1162        self.assertFalse(a.done())
1163        self.assertTrue(b.done())
1164        self.assertIsNone(b.result())
1165        self.assertAlmostEqual(0.1, loop.time())
1166
1167        # move forward to close generator
1168        loop.advance_time(10)
1169        loop.run_until_complete(asyncio.wait([a, b]))
1170
1171    def test_wait_really_done(self):
1172        # there is possibility that some tasks in the pending list
1173        # became done but their callbacks haven't all been called yet
1174
1175        async def coro1():
1176            await asyncio.sleep(0)
1177
1178        async def coro2():
1179            await asyncio.sleep(0)
1180            await asyncio.sleep(0)
1181
1182        a = self.new_task(self.loop, coro1())
1183        b = self.new_task(self.loop, coro2())
1184        task = self.new_task(
1185            self.loop,
1186            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1187
1188        done, pending = self.loop.run_until_complete(task)
1189        self.assertEqual({a, b}, done)
1190        self.assertTrue(a.done())
1191        self.assertIsNone(a.result())
1192        self.assertTrue(b.done())
1193        self.assertIsNone(b.result())
1194
1195    def test_wait_first_exception(self):
1196
1197        def gen():
1198            when = yield
1199            self.assertAlmostEqual(10.0, when)
1200            yield 0
1201
1202        loop = self.new_test_loop(gen)
1203
1204        # first_exception, task already has exception
1205        a = self.new_task(loop, asyncio.sleep(10.0))
1206
1207        async def exc():
1208            raise ZeroDivisionError('err')
1209
1210        b = self.new_task(loop, exc())
1211        task = self.new_task(
1212            loop,
1213            asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION))
1214
1215        done, pending = loop.run_until_complete(task)
1216        self.assertEqual({b}, done)
1217        self.assertEqual({a}, pending)
1218        self.assertAlmostEqual(0, loop.time())
1219
1220        # move forward to close generator
1221        loop.advance_time(10)
1222        loop.run_until_complete(asyncio.wait([a, b]))
1223
1224    def test_wait_first_exception_in_wait(self):
1225
1226        def gen():
1227            when = yield
1228            self.assertAlmostEqual(10.0, when)
1229            when = yield 0
1230            self.assertAlmostEqual(0.01, when)
1231            yield 0.01
1232
1233        loop = self.new_test_loop(gen)
1234
1235        # first_exception, exception during waiting
1236        a = self.new_task(loop, asyncio.sleep(10.0))
1237
1238        async def exc():
1239            await asyncio.sleep(0.01)
1240            raise ZeroDivisionError('err')
1241
1242        b = self.new_task(loop, exc())
1243        task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)
1244
1245        done, pending = loop.run_until_complete(task)
1246        self.assertEqual({b}, done)
1247        self.assertEqual({a}, pending)
1248        self.assertAlmostEqual(0.01, loop.time())
1249
1250        # move forward to close generator
1251        loop.advance_time(10)
1252        loop.run_until_complete(asyncio.wait([a, b]))
1253
1254    def test_wait_with_exception(self):
1255
1256        def gen():
1257            when = yield
1258            self.assertAlmostEqual(0.1, when)
1259            when = yield 0
1260            self.assertAlmostEqual(0.15, when)
1261            yield 0.15
1262
1263        loop = self.new_test_loop(gen)
1264
1265        a = self.new_task(loop, asyncio.sleep(0.1))
1266
1267        async def sleeper():
1268            await asyncio.sleep(0.15)
1269            raise ZeroDivisionError('really')
1270
1271        b = self.new_task(loop, sleeper())
1272
1273        async def foo():
1274            done, pending = await asyncio.wait([b, a])
1275            self.assertEqual(len(done), 2)
1276            self.assertEqual(pending, set())
1277            errors = set(f for f in done if f.exception() is not None)
1278            self.assertEqual(len(errors), 1)
1279
1280        loop.run_until_complete(self.new_task(loop, foo()))
1281        self.assertAlmostEqual(0.15, loop.time())
1282
1283        loop.run_until_complete(self.new_task(loop, foo()))
1284        self.assertAlmostEqual(0.15, loop.time())
1285
1286    def test_wait_with_timeout(self):
1287
1288        def gen():
1289            when = yield
1290            self.assertAlmostEqual(0.1, when)
1291            when = yield 0
1292            self.assertAlmostEqual(0.15, when)
1293            when = yield 0
1294            self.assertAlmostEqual(0.11, when)
1295            yield 0.11
1296
1297        loop = self.new_test_loop(gen)
1298
1299        a = self.new_task(loop, asyncio.sleep(0.1))
1300        b = self.new_task(loop, asyncio.sleep(0.15))
1301
1302        async def foo():
1303            done, pending = await asyncio.wait([b, a], timeout=0.11)
1304            self.assertEqual(done, set([a]))
1305            self.assertEqual(pending, set([b]))
1306
1307        loop.run_until_complete(self.new_task(loop, foo()))
1308        self.assertAlmostEqual(0.11, loop.time())
1309
1310        # move forward to close generator
1311        loop.advance_time(10)
1312        loop.run_until_complete(asyncio.wait([a, b]))
1313
1314    def test_wait_concurrent_complete(self):
1315
1316        def gen():
1317            when = yield
1318            self.assertAlmostEqual(0.1, when)
1319            when = yield 0
1320            self.assertAlmostEqual(0.15, when)
1321            when = yield 0
1322            self.assertAlmostEqual(0.1, when)
1323            yield 0.1
1324
1325        loop = self.new_test_loop(gen)
1326
1327        a = self.new_task(loop, asyncio.sleep(0.1))
1328        b = self.new_task(loop, asyncio.sleep(0.15))
1329
1330        done, pending = loop.run_until_complete(
1331            asyncio.wait([b, a], timeout=0.1))
1332
1333        self.assertEqual(done, set([a]))
1334        self.assertEqual(pending, set([b]))
1335        self.assertAlmostEqual(0.1, loop.time())
1336
1337        # move forward to close generator
1338        loop.advance_time(10)
1339        loop.run_until_complete(asyncio.wait([a, b]))
1340
1341    def test_wait_with_iterator_of_tasks(self):
1342
1343        def gen():
1344            when = yield
1345            self.assertAlmostEqual(0.1, when)
1346            when = yield 0
1347            self.assertAlmostEqual(0.15, when)
1348            yield 0.15
1349
1350        loop = self.new_test_loop(gen)
1351
1352        a = self.new_task(loop, asyncio.sleep(0.1))
1353        b = self.new_task(loop, asyncio.sleep(0.15))
1354
1355        async def foo():
1356            done, pending = await asyncio.wait(iter([b, a]))
1357            self.assertEqual(done, set([a, b]))
1358            self.assertEqual(pending, set())
1359            return 42
1360
1361        res = loop.run_until_complete(self.new_task(loop, foo()))
1362        self.assertEqual(res, 42)
1363        self.assertAlmostEqual(0.15, loop.time())
1364
1365    def test_as_completed(self):
1366
1367        def gen():
1368            yield 0
1369            yield 0
1370            yield 0.01
1371            yield 0
1372
1373        loop = self.new_test_loop(gen)
1374        # disable "slow callback" warning
1375        loop.slow_callback_duration = 1.0
1376        completed = set()
1377        time_shifted = False
1378
1379        async def sleeper(dt, x):
1380            nonlocal time_shifted
1381            await asyncio.sleep(dt)
1382            completed.add(x)
1383            if not time_shifted and 'a' in completed and 'b' in completed:
1384                time_shifted = True
1385                loop.advance_time(0.14)
1386            return x
1387
1388        a = sleeper(0.01, 'a')
1389        b = sleeper(0.01, 'b')
1390        c = sleeper(0.15, 'c')
1391
1392        async def foo():
1393            values = []
1394            for f in asyncio.as_completed([b, c, a]):
1395                values.append(await f)
1396            return values
1397
1398        res = loop.run_until_complete(self.new_task(loop, foo()))
1399        self.assertAlmostEqual(0.15, loop.time())
1400        self.assertTrue('a' in res[:2])
1401        self.assertTrue('b' in res[:2])
1402        self.assertEqual(res[2], 'c')
1403
1404    def test_as_completed_with_timeout(self):
1405
1406        def gen():
1407            yield
1408            yield 0
1409            yield 0
1410            yield 0.1
1411
1412        loop = self.new_test_loop(gen)
1413
1414        a = loop.create_task(asyncio.sleep(0.1, 'a'))
1415        b = loop.create_task(asyncio.sleep(0.15, 'b'))
1416
1417        async def foo():
1418            values = []
1419            for f in asyncio.as_completed([a, b], timeout=0.12):
1420                if values:
1421                    loop.advance_time(0.02)
1422                try:
1423                    v = await f
1424                    values.append((1, v))
1425                except asyncio.TimeoutError as exc:
1426                    values.append((2, exc))
1427            return values
1428
1429        res = loop.run_until_complete(self.new_task(loop, foo()))
1430        self.assertEqual(len(res), 2, res)
1431        self.assertEqual(res[0], (1, 'a'))
1432        self.assertEqual(res[1][0], 2)
1433        self.assertIsInstance(res[1][1], asyncio.TimeoutError)
1434        self.assertAlmostEqual(0.12, loop.time())
1435
1436        # move forward to close generator
1437        loop.advance_time(10)
1438        loop.run_until_complete(asyncio.wait([a, b]))
1439
1440    def test_as_completed_with_unused_timeout(self):
1441
1442        def gen():
1443            yield
1444            yield 0
1445            yield 0.01
1446
1447        loop = self.new_test_loop(gen)
1448
1449        a = asyncio.sleep(0.01, 'a')
1450
1451        async def foo():
1452            for f in asyncio.as_completed([a], timeout=1):
1453                v = await f
1454                self.assertEqual(v, 'a')
1455
1456        loop.run_until_complete(self.new_task(loop, foo()))
1457
1458    def test_as_completed_reverse_wait(self):
1459
1460        def gen():
1461            yield 0
1462            yield 0.05
1463            yield 0
1464
1465        loop = self.new_test_loop(gen)
1466
1467        a = asyncio.sleep(0.05, 'a')
1468        b = asyncio.sleep(0.10, 'b')
1469        fs = {a, b}
1470
1471        async def test():
1472            futs = list(asyncio.as_completed(fs))
1473            self.assertEqual(len(futs), 2)
1474
1475            x = await futs[1]
1476            self.assertEqual(x, 'a')
1477            self.assertAlmostEqual(0.05, loop.time())
1478            loop.advance_time(0.05)
1479            y = await futs[0]
1480            self.assertEqual(y, 'b')
1481            self.assertAlmostEqual(0.10, loop.time())
1482
1483        loop.run_until_complete(test())
1484
1485    def test_as_completed_concurrent(self):
1486
1487        def gen():
1488            when = yield
1489            self.assertAlmostEqual(0.05, when)
1490            when = yield 0
1491            self.assertAlmostEqual(0.05, when)
1492            yield 0.05
1493
1494        a = asyncio.sleep(0.05, 'a')
1495        b = asyncio.sleep(0.05, 'b')
1496        fs = {a, b}
1497
1498        async def test():
1499            futs = list(asyncio.as_completed(fs))
1500            self.assertEqual(len(futs), 2)
1501            done, pending = await asyncio.wait(
1502                [asyncio.ensure_future(fut) for fut in futs]
1503            )
1504            self.assertEqual(set(f.result() for f in done), {'a', 'b'})
1505
1506        loop = self.new_test_loop(gen)
1507        loop.run_until_complete(test())
1508
1509    def test_as_completed_duplicate_coroutines(self):
1510
1511        async def coro(s):
1512            return s
1513
1514        async def runner():
1515            result = []
1516            c = coro('ham')
1517            for f in asyncio.as_completed([c, c, coro('spam')]):
1518                result.append(await f)
1519            return result
1520
1521        fut = self.new_task(self.loop, runner())
1522        self.loop.run_until_complete(fut)
1523        result = fut.result()
1524        self.assertEqual(set(result), {'ham', 'spam'})
1525        self.assertEqual(len(result), 2)
1526
1527    def test_as_completed_coroutine_without_loop(self):
1528        async def coro():
1529            return 42
1530
1531        a = coro()
1532        self.addCleanup(a.close)
1533
1534        futs = asyncio.as_completed([a])
1535        with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
1536            list(futs)
1537
1538    def test_as_completed_coroutine_use_running_loop(self):
1539        loop = self.new_test_loop()
1540
1541        async def coro():
1542            return 42
1543
1544        async def test():
1545            futs = list(asyncio.as_completed([coro()]))
1546            self.assertEqual(len(futs), 1)
1547            self.assertEqual(await futs[0], 42)
1548
1549        loop.run_until_complete(test())
1550
1551    def test_sleep(self):
1552
1553        def gen():
1554            when = yield
1555            self.assertAlmostEqual(0.05, when)
1556            when = yield 0.05
1557            self.assertAlmostEqual(0.1, when)
1558            yield 0.05
1559
1560        loop = self.new_test_loop(gen)
1561
1562        async def sleeper(dt, arg):
1563            await asyncio.sleep(dt/2)
1564            res = await asyncio.sleep(dt/2, arg)
1565            return res
1566
1567        t = self.new_task(loop, sleeper(0.1, 'yeah'))
1568        loop.run_until_complete(t)
1569        self.assertTrue(t.done())
1570        self.assertEqual(t.result(), 'yeah')
1571        self.assertAlmostEqual(0.1, loop.time())
1572
1573    def test_sleep_cancel(self):
1574
1575        def gen():
1576            when = yield
1577            self.assertAlmostEqual(10.0, when)
1578            yield 0
1579
1580        loop = self.new_test_loop(gen)
1581
1582        t = self.new_task(loop, asyncio.sleep(10.0, 'yeah'))
1583
1584        handle = None
1585        orig_call_later = loop.call_later
1586
1587        def call_later(delay, callback, *args):
1588            nonlocal handle
1589            handle = orig_call_later(delay, callback, *args)
1590            return handle
1591
1592        loop.call_later = call_later
1593        test_utils.run_briefly(loop)
1594
1595        self.assertFalse(handle._cancelled)
1596
1597        t.cancel()
1598        test_utils.run_briefly(loop)
1599        self.assertTrue(handle._cancelled)
1600
1601    def test_task_cancel_sleeping_task(self):
1602
1603        def gen():
1604            when = yield
1605            self.assertAlmostEqual(0.1, when)
1606            when = yield 0
1607            self.assertAlmostEqual(5000, when)
1608            yield 0.1
1609
1610        loop = self.new_test_loop(gen)
1611
1612        async def sleep(dt):
1613            await asyncio.sleep(dt)
1614
1615        async def doit():
1616            sleeper = self.new_task(loop, sleep(5000))
1617            loop.call_later(0.1, sleeper.cancel)
1618            try:
1619                await sleeper
1620            except asyncio.CancelledError:
1621                return 'cancelled'
1622            else:
1623                return 'slept in'
1624
1625        doer = doit()
1626        self.assertEqual(loop.run_until_complete(doer), 'cancelled')
1627        self.assertAlmostEqual(0.1, loop.time())
1628
1629    def test_task_cancel_waiter_future(self):
1630        fut = self.new_future(self.loop)
1631
1632        async def coro():
1633            await fut
1634
1635        task = self.new_task(self.loop, coro())
1636        test_utils.run_briefly(self.loop)
1637        self.assertIs(task._fut_waiter, fut)
1638
1639        task.cancel()
1640        test_utils.run_briefly(self.loop)
1641        self.assertRaises(
1642            asyncio.CancelledError, self.loop.run_until_complete, task)
1643        self.assertIsNone(task._fut_waiter)
1644        self.assertTrue(fut.cancelled())
1645
1646    def test_task_set_methods(self):
1647        async def notmuch():
1648            return 'ko'
1649
1650        gen = notmuch()
1651        task = self.new_task(self.loop, gen)
1652
1653        with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
1654            task.set_result('ok')
1655
1656        with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
1657            task.set_exception(ValueError())
1658
1659        self.assertEqual(
1660            self.loop.run_until_complete(task),
1661            'ko')
1662
1663    def test_step_result_future(self):
1664        # If coroutine returns future, task waits on this future.
1665
1666        class Fut(asyncio.Future):
1667            def __init__(self, *args, **kwds):
1668                self.cb_added = False
1669                super().__init__(*args, **kwds)
1670
1671            def add_done_callback(self, *args, **kwargs):
1672                self.cb_added = True
1673                super().add_done_callback(*args, **kwargs)
1674
1675        fut = Fut(loop=self.loop)
1676        result = None
1677
1678        async def wait_for_future():
1679            nonlocal result
1680            result = await fut
1681
1682        t = self.new_task(self.loop, wait_for_future())
1683        test_utils.run_briefly(self.loop)
1684        self.assertTrue(fut.cb_added)
1685
1686        res = object()
1687        fut.set_result(res)
1688        test_utils.run_briefly(self.loop)
1689        self.assertIs(res, result)
1690        self.assertTrue(t.done())
1691        self.assertIsNone(t.result())
1692
1693    def test_baseexception_during_cancel(self):
1694
1695        def gen():
1696            when = yield
1697            self.assertAlmostEqual(10.0, when)
1698            yield 0
1699
1700        loop = self.new_test_loop(gen)
1701
1702        async def sleeper():
1703            await asyncio.sleep(10)
1704
1705        base_exc = SystemExit()
1706
1707        async def notmutch():
1708            try:
1709                await sleeper()
1710            except asyncio.CancelledError:
1711                raise base_exc
1712
1713        task = self.new_task(loop, notmutch())
1714        test_utils.run_briefly(loop)
1715
1716        task.cancel()
1717        self.assertFalse(task.done())
1718
1719        self.assertRaises(SystemExit, test_utils.run_briefly, loop)
1720
1721        self.assertTrue(task.done())
1722        self.assertFalse(task.cancelled())
1723        self.assertIs(task.exception(), base_exc)
1724
1725    def test_iscoroutinefunction(self):
1726        def fn():
1727            pass
1728
1729        self.assertFalse(asyncio.iscoroutinefunction(fn))
1730
1731        def fn1():
1732            yield
1733        self.assertFalse(asyncio.iscoroutinefunction(fn1))
1734
1735        async def fn2():
1736            pass
1737        self.assertTrue(asyncio.iscoroutinefunction(fn2))
1738
1739        self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
1740        self.assertTrue(asyncio.iscoroutinefunction(mock.AsyncMock()))
1741
1742    def test_coroutine_non_gen_function(self):
1743        async def func():
1744            return 'test'
1745
1746        self.assertTrue(asyncio.iscoroutinefunction(func))
1747
1748        coro = func()
1749        self.assertTrue(asyncio.iscoroutine(coro))
1750
1751        res = self.loop.run_until_complete(coro)
1752        self.assertEqual(res, 'test')
1753
1754    def test_coroutine_non_gen_function_return_future(self):
1755        fut = self.new_future(self.loop)
1756
1757        async def func():
1758            return fut
1759
1760        async def coro():
1761            fut.set_result('test')
1762
1763        t1 = self.new_task(self.loop, func())
1764        t2 = self.new_task(self.loop, coro())
1765        res = self.loop.run_until_complete(t1)
1766        self.assertEqual(res, fut)
1767        self.assertIsNone(t2.result())
1768
1769    def test_current_task(self):
1770        self.assertIsNone(asyncio.current_task(loop=self.loop))
1771
1772        async def coro(loop):
1773            self.assertIs(asyncio.current_task(), task)
1774
1775            self.assertIs(asyncio.current_task(None), task)
1776            self.assertIs(asyncio.current_task(), task)
1777
1778        task = self.new_task(self.loop, coro(self.loop))
1779        self.loop.run_until_complete(task)
1780        self.assertIsNone(asyncio.current_task(loop=self.loop))
1781
1782    def test_current_task_with_interleaving_tasks(self):
1783        self.assertIsNone(asyncio.current_task(loop=self.loop))
1784
1785        fut1 = self.new_future(self.loop)
1786        fut2 = self.new_future(self.loop)
1787
1788        async def coro1(loop):
1789            self.assertTrue(asyncio.current_task() is task1)
1790            await fut1
1791            self.assertTrue(asyncio.current_task() is task1)
1792            fut2.set_result(True)
1793
1794        async def coro2(loop):
1795            self.assertTrue(asyncio.current_task() is task2)
1796            fut1.set_result(True)
1797            await fut2
1798            self.assertTrue(asyncio.current_task() is task2)
1799
1800        task1 = self.new_task(self.loop, coro1(self.loop))
1801        task2 = self.new_task(self.loop, coro2(self.loop))
1802
1803        self.loop.run_until_complete(asyncio.wait((task1, task2)))
1804        self.assertIsNone(asyncio.current_task(loop=self.loop))
1805
1806    # Some thorough tests for cancellation propagation through
1807    # coroutines, tasks and wait().
1808
1809    def test_yield_future_passes_cancel(self):
1810        # Cancelling outer() cancels inner() cancels waiter.
1811        proof = 0
1812        waiter = self.new_future(self.loop)
1813
1814        async def inner():
1815            nonlocal proof
1816            try:
1817                await waiter
1818            except asyncio.CancelledError:
1819                proof += 1
1820                raise
1821            else:
1822                self.fail('got past sleep() in inner()')
1823
1824        async def outer():
1825            nonlocal proof
1826            try:
1827                await inner()
1828            except asyncio.CancelledError:
1829                proof += 100  # Expect this path.
1830            else:
1831                proof += 10
1832
1833        f = asyncio.ensure_future(outer(), loop=self.loop)
1834        test_utils.run_briefly(self.loop)
1835        f.cancel()
1836        self.loop.run_until_complete(f)
1837        self.assertEqual(proof, 101)
1838        self.assertTrue(waiter.cancelled())
1839
1840    def test_yield_wait_does_not_shield_cancel(self):
1841        # Cancelling outer() makes wait() return early, leaves inner()
1842        # running.
1843        proof = 0
1844        waiter = self.new_future(self.loop)
1845
1846        async def inner():
1847            nonlocal proof
1848            await waiter
1849            proof += 1
1850
1851        async def outer():
1852            nonlocal proof
1853            with self.assertWarns(DeprecationWarning):
1854                d, p = await asyncio.wait([asyncio.create_task(inner())])
1855            proof += 100
1856
1857        f = asyncio.ensure_future(outer(), loop=self.loop)
1858        test_utils.run_briefly(self.loop)
1859        f.cancel()
1860        self.assertRaises(
1861            asyncio.CancelledError, self.loop.run_until_complete, f)
1862        waiter.set_result(None)
1863        test_utils.run_briefly(self.loop)
1864        self.assertEqual(proof, 1)
1865
1866    def test_shield_result(self):
1867        inner = self.new_future(self.loop)
1868        outer = asyncio.shield(inner)
1869        inner.set_result(42)
1870        res = self.loop.run_until_complete(outer)
1871        self.assertEqual(res, 42)
1872
1873    def test_shield_exception(self):
1874        inner = self.new_future(self.loop)
1875        outer = asyncio.shield(inner)
1876        test_utils.run_briefly(self.loop)
1877        exc = RuntimeError('expected')
1878        inner.set_exception(exc)
1879        test_utils.run_briefly(self.loop)
1880        self.assertIs(outer.exception(), exc)
1881
1882    def test_shield_cancel_inner(self):
1883        inner = self.new_future(self.loop)
1884        outer = asyncio.shield(inner)
1885        test_utils.run_briefly(self.loop)
1886        inner.cancel()
1887        test_utils.run_briefly(self.loop)
1888        self.assertTrue(outer.cancelled())
1889
1890    def test_shield_cancel_outer(self):
1891        inner = self.new_future(self.loop)
1892        outer = asyncio.shield(inner)
1893        test_utils.run_briefly(self.loop)
1894        outer.cancel()
1895        test_utils.run_briefly(self.loop)
1896        self.assertTrue(outer.cancelled())
1897        self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
1898
1899    def test_shield_shortcut(self):
1900        fut = self.new_future(self.loop)
1901        fut.set_result(42)
1902        res = self.loop.run_until_complete(asyncio.shield(fut))
1903        self.assertEqual(res, 42)
1904
1905    def test_shield_effect(self):
1906        # Cancelling outer() does not affect inner().
1907        proof = 0
1908        waiter = self.new_future(self.loop)
1909
1910        async def inner():
1911            nonlocal proof
1912            await waiter
1913            proof += 1
1914
1915        async def outer():
1916            nonlocal proof
1917            await asyncio.shield(inner())
1918            proof += 100
1919
1920        f = asyncio.ensure_future(outer(), loop=self.loop)
1921        test_utils.run_briefly(self.loop)
1922        f.cancel()
1923        with self.assertRaises(asyncio.CancelledError):
1924            self.loop.run_until_complete(f)
1925        waiter.set_result(None)
1926        test_utils.run_briefly(self.loop)
1927        self.assertEqual(proof, 1)
1928
1929    def test_shield_gather(self):
1930        child1 = self.new_future(self.loop)
1931        child2 = self.new_future(self.loop)
1932        parent = asyncio.gather(child1, child2)
1933        outer = asyncio.shield(parent)
1934        test_utils.run_briefly(self.loop)
1935        outer.cancel()
1936        test_utils.run_briefly(self.loop)
1937        self.assertTrue(outer.cancelled())
1938        child1.set_result(1)
1939        child2.set_result(2)
1940        test_utils.run_briefly(self.loop)
1941        self.assertEqual(parent.result(), [1, 2])
1942
1943    def test_gather_shield(self):
1944        child1 = self.new_future(self.loop)
1945        child2 = self.new_future(self.loop)
1946        inner1 = asyncio.shield(child1)
1947        inner2 = asyncio.shield(child2)
1948        parent = asyncio.gather(inner1, inner2)
1949        test_utils.run_briefly(self.loop)
1950        parent.cancel()
1951        # This should cancel inner1 and inner2 but bot child1 and child2.
1952        test_utils.run_briefly(self.loop)
1953        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
1954        self.assertTrue(inner1.cancelled())
1955        self.assertTrue(inner2.cancelled())
1956        child1.set_result(1)
1957        child2.set_result(2)
1958        test_utils.run_briefly(self.loop)
1959
1960    def test_shield_coroutine_without_loop(self):
1961        async def coro():
1962            return 42
1963
1964        inner = coro()
1965        self.addCleanup(inner.close)
1966        with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
1967            asyncio.shield(inner)
1968
1969    def test_shield_coroutine_use_running_loop(self):
1970        async def coro():
1971            return 42
1972
1973        async def test():
1974            return asyncio.shield(coro())
1975        outer = self.loop.run_until_complete(test())
1976        self.assertEqual(outer._loop, self.loop)
1977        res = self.loop.run_until_complete(outer)
1978        self.assertEqual(res, 42)
1979
1980    def test_shield_coroutine_use_global_loop(self):
1981        # Deprecated in 3.10, undeprecated in 3.11.1
1982        async def coro():
1983            return 42
1984
1985        asyncio.set_event_loop(self.loop)
1986        self.addCleanup(asyncio.set_event_loop, None)
1987        outer = asyncio.shield(coro())
1988        self.assertEqual(outer._loop, self.loop)
1989        res = self.loop.run_until_complete(outer)
1990        self.assertEqual(res, 42)
1991
1992    def test_as_completed_invalid_args(self):
1993        fut = self.new_future(self.loop)
1994
1995        # as_completed() expects a list of futures, not a future instance
1996        self.assertRaises(TypeError, self.loop.run_until_complete,
1997            asyncio.as_completed(fut))
1998        coro = coroutine_function()
1999        self.assertRaises(TypeError, self.loop.run_until_complete,
2000            asyncio.as_completed(coro))
2001        coro.close()
2002
2003    def test_wait_invalid_args(self):
2004        fut = self.new_future(self.loop)
2005
2006        # wait() expects a list of futures, not a future instance
2007        self.assertRaises(TypeError, self.loop.run_until_complete,
2008            asyncio.wait(fut))
2009        coro = coroutine_function()
2010        self.assertRaises(TypeError, self.loop.run_until_complete,
2011            asyncio.wait(coro))
2012        coro.close()
2013
2014        # wait() expects at least a future
2015        self.assertRaises(ValueError, self.loop.run_until_complete,
2016            asyncio.wait([]))
2017
2018    def test_log_destroyed_pending_task(self):
2019        Task = self.__class__.Task
2020
2021        async def kill_me(loop):
2022            future = self.new_future(loop)
2023            await future
2024            # at this point, the only reference to kill_me() task is
2025            # the Task._wakeup() method in future._callbacks
2026            raise Exception("code never reached")
2027
2028        mock_handler = mock.Mock()
2029        self.loop.set_debug(True)
2030        self.loop.set_exception_handler(mock_handler)
2031
2032        # schedule the task
2033        coro = kill_me(self.loop)
2034        task = asyncio.ensure_future(coro, loop=self.loop)
2035
2036        self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
2037
2038        asyncio.set_event_loop(None)
2039
2040        # execute the task so it waits for future
2041        self.loop._run_once()
2042        self.assertEqual(len(self.loop._ready), 0)
2043
2044        coro = None
2045        source_traceback = task._source_traceback
2046        task = None
2047
2048        # no more reference to kill_me() task: the task is destroyed by the GC
2049        support.gc_collect()
2050
2051        self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
2052
2053        mock_handler.assert_called_with(self.loop, {
2054            'message': 'Task was destroyed but it is pending!',
2055            'task': mock.ANY,
2056            'source_traceback': source_traceback,
2057        })
2058        mock_handler.reset_mock()
2059
2060    @mock.patch('asyncio.base_events.logger')
2061    def test_tb_logger_not_called_after_cancel(self, m_log):
2062        loop = asyncio.new_event_loop()
2063        self.set_event_loop(loop)
2064
2065        async def coro():
2066            raise TypeError
2067
2068        async def runner():
2069            task = self.new_task(loop, coro())
2070            await asyncio.sleep(0.05)
2071            task.cancel()
2072            task = None
2073
2074        loop.run_until_complete(runner())
2075        self.assertFalse(m_log.error.called)
2076
2077    def test_task_source_traceback(self):
2078        self.loop.set_debug(True)
2079
2080        task = self.new_task(self.loop, coroutine_function())
2081        lineno = sys._getframe().f_lineno - 1
2082        self.assertIsInstance(task._source_traceback, list)
2083        self.assertEqual(task._source_traceback[-2][:3],
2084                         (__file__,
2085                          lineno,
2086                          'test_task_source_traceback'))
2087        self.loop.run_until_complete(task)
2088
2089    def test_cancel_gather_1(self):
2090        """Ensure that a gathering future refuses to be cancelled once all
2091        children are done"""
2092        loop = asyncio.new_event_loop()
2093        self.addCleanup(loop.close)
2094
2095        fut = self.new_future(loop)
2096        async def create():
2097            # The indirection fut->child_coro is needed since otherwise the
2098            # gathering task is done at the same time as the child future
2099            def child_coro():
2100                return (yield from fut)
2101            gather_future = asyncio.gather(child_coro())
2102            return asyncio.ensure_future(gather_future)
2103        gather_task = loop.run_until_complete(create())
2104
2105        cancel_result = None
2106        def cancelling_callback(_):
2107            nonlocal cancel_result
2108            cancel_result = gather_task.cancel()
2109        fut.add_done_callback(cancelling_callback)
2110
2111        fut.set_result(42) # calls the cancelling_callback after fut is done()
2112
2113        # At this point the task should complete.
2114        loop.run_until_complete(gather_task)
2115
2116        # Python issue #26923: asyncio.gather drops cancellation
2117        self.assertEqual(cancel_result, False)
2118        self.assertFalse(gather_task.cancelled())
2119        self.assertEqual(gather_task.result(), [42])
2120
2121    def test_cancel_gather_2(self):
2122        cases = [
2123            ((), ()),
2124            ((None,), ()),
2125            (('my message',), ('my message',)),
2126            # Non-string values should roundtrip.
2127            ((5,), (5,)),
2128        ]
2129        for cancel_args, expected_args in cases:
2130            with self.subTest(cancel_args=cancel_args):
2131                loop = asyncio.new_event_loop()
2132                self.addCleanup(loop.close)
2133
2134                async def test():
2135                    time = 0
2136                    while True:
2137                        time += 0.05
2138                        await asyncio.gather(asyncio.sleep(0.05),
2139                                             return_exceptions=True)
2140                        if time > 1:
2141                            return
2142
2143                async def main():
2144                    qwe = self.new_task(loop, test())
2145                    await asyncio.sleep(0.2)
2146                    qwe.cancel(*cancel_args)
2147                    await qwe
2148
2149                try:
2150                    loop.run_until_complete(main())
2151                except asyncio.CancelledError as exc:
2152                    self.assertEqual(exc.args, expected_args)
2153                    actual = get_innermost_context(exc)
2154                    self.assertEqual(
2155                        actual,
2156                        (asyncio.CancelledError, expected_args, 0),
2157                    )
2158                else:
2159                    self.fail(
2160                        'gather() does not propagate CancelledError '
2161                        'raised by inner task to the gather() caller.'
2162                    )
2163
2164    def test_exception_traceback(self):
2165        # See http://bugs.python.org/issue28843
2166
2167        async def foo():
2168            1 / 0
2169
2170        async def main():
2171            task = self.new_task(self.loop, foo())
2172            await asyncio.sleep(0)  # skip one loop iteration
2173            self.assertIsNotNone(task.exception().__traceback__)
2174
2175        self.loop.run_until_complete(main())
2176
2177    @mock.patch('asyncio.base_events.logger')
2178    def test_error_in_call_soon(self, m_log):
2179        def call_soon(callback, *args, **kwargs):
2180            raise ValueError
2181        self.loop.call_soon = call_soon
2182
2183        async def coro():
2184            pass
2185
2186        self.assertFalse(m_log.error.called)
2187
2188        with self.assertRaises(ValueError):
2189            gen = coro()
2190            try:
2191                self.new_task(self.loop, gen)
2192            finally:
2193                gen.close()
2194        gc.collect()  # For PyPy or other GCs.
2195
2196        self.assertTrue(m_log.error.called)
2197        message = m_log.error.call_args[0][0]
2198        self.assertIn('Task was destroyed but it is pending', message)
2199
2200        self.assertEqual(asyncio.all_tasks(self.loop), set())
2201
2202    def test_create_task_with_noncoroutine(self):
2203        with self.assertRaisesRegex(TypeError,
2204                                    "a coroutine was expected, got 123"):
2205            self.new_task(self.loop, 123)
2206
2207        # test it for the second time to ensure that caching
2208        # in asyncio.iscoroutine() doesn't break things.
2209        with self.assertRaisesRegex(TypeError,
2210                                    "a coroutine was expected, got 123"):
2211            self.new_task(self.loop, 123)
2212
2213    def test_create_task_with_async_function(self):
2214
2215        async def coro():
2216            pass
2217
2218        task = self.new_task(self.loop, coro())
2219        self.assertIsInstance(task, self.Task)
2220        self.loop.run_until_complete(task)
2221
2222        # test it for the second time to ensure that caching
2223        # in asyncio.iscoroutine() doesn't break things.
2224        task = self.new_task(self.loop, coro())
2225        self.assertIsInstance(task, self.Task)
2226        self.loop.run_until_complete(task)
2227
2228    def test_create_task_with_asynclike_function(self):
2229        task = self.new_task(self.loop, CoroLikeObject())
2230        self.assertIsInstance(task, self.Task)
2231        self.assertEqual(self.loop.run_until_complete(task), 42)
2232
2233        # test it for the second time to ensure that caching
2234        # in asyncio.iscoroutine() doesn't break things.
2235        task = self.new_task(self.loop, CoroLikeObject())
2236        self.assertIsInstance(task, self.Task)
2237        self.assertEqual(self.loop.run_until_complete(task), 42)
2238
2239    def test_bare_create_task(self):
2240
2241        async def inner():
2242            return 1
2243
2244        async def coro():
2245            task = asyncio.create_task(inner())
2246            self.assertIsInstance(task, self.Task)
2247            ret = await task
2248            self.assertEqual(1, ret)
2249
2250        self.loop.run_until_complete(coro())
2251
2252    def test_bare_create_named_task(self):
2253
2254        async def coro_noop():
2255            pass
2256
2257        async def coro():
2258            task = asyncio.create_task(coro_noop(), name='No-op')
2259            self.assertEqual(task.get_name(), 'No-op')
2260            await task
2261
2262        self.loop.run_until_complete(coro())
2263
2264    def test_context_1(self):
2265        cvar = contextvars.ContextVar('cvar', default='nope')
2266
2267        async def sub():
2268            await asyncio.sleep(0.01)
2269            self.assertEqual(cvar.get(), 'nope')
2270            cvar.set('something else')
2271
2272        async def main():
2273            self.assertEqual(cvar.get(), 'nope')
2274            subtask = self.new_task(loop, sub())
2275            cvar.set('yes')
2276            self.assertEqual(cvar.get(), 'yes')
2277            await subtask
2278            self.assertEqual(cvar.get(), 'yes')
2279
2280        loop = asyncio.new_event_loop()
2281        try:
2282            task = self.new_task(loop, main())
2283            loop.run_until_complete(task)
2284        finally:
2285            loop.close()
2286
2287    def test_context_2(self):
2288        cvar = contextvars.ContextVar('cvar', default='nope')
2289
2290        async def main():
2291            def fut_on_done(fut):
2292                # This change must not pollute the context
2293                # of the "main()" task.
2294                cvar.set('something else')
2295
2296            self.assertEqual(cvar.get(), 'nope')
2297
2298            for j in range(2):
2299                fut = self.new_future(loop)
2300                fut.add_done_callback(fut_on_done)
2301                cvar.set(f'yes{j}')
2302                loop.call_soon(fut.set_result, None)
2303                await fut
2304                self.assertEqual(cvar.get(), f'yes{j}')
2305
2306                for i in range(3):
2307                    # Test that task passed its context to add_done_callback:
2308                    cvar.set(f'yes{i}-{j}')
2309                    await asyncio.sleep(0.001)
2310                    self.assertEqual(cvar.get(), f'yes{i}-{j}')
2311
2312        loop = asyncio.new_event_loop()
2313        try:
2314            task = self.new_task(loop, main())
2315            loop.run_until_complete(task)
2316        finally:
2317            loop.close()
2318
2319        self.assertEqual(cvar.get(), 'nope')
2320
2321    def test_context_3(self):
2322        # Run 100 Tasks in parallel, each modifying cvar.
2323
2324        cvar = contextvars.ContextVar('cvar', default=-1)
2325
2326        async def sub(num):
2327            for i in range(10):
2328                cvar.set(num + i)
2329                await asyncio.sleep(random.uniform(0.001, 0.05))
2330                self.assertEqual(cvar.get(), num + i)
2331
2332        async def main():
2333            tasks = []
2334            for i in range(100):
2335                task = loop.create_task(sub(random.randint(0, 10)))
2336                tasks.append(task)
2337
2338            await asyncio.gather(*tasks)
2339
2340        loop = asyncio.new_event_loop()
2341        try:
2342            loop.run_until_complete(main())
2343        finally:
2344            loop.close()
2345
2346        self.assertEqual(cvar.get(), -1)
2347
2348    def test_context_4(self):
2349        cvar = contextvars.ContextVar('cvar')
2350
2351        async def coro(val):
2352            await asyncio.sleep(0)
2353            cvar.set(val)
2354
2355        async def main():
2356            ret = []
2357            ctx = contextvars.copy_context()
2358            ret.append(ctx.get(cvar))
2359            t1 = self.new_task(loop, coro(1), context=ctx)
2360            await t1
2361            ret.append(ctx.get(cvar))
2362            t2 = self.new_task(loop, coro(2), context=ctx)
2363            await t2
2364            ret.append(ctx.get(cvar))
2365            return ret
2366
2367        loop = asyncio.new_event_loop()
2368        try:
2369            task = self.new_task(loop, main())
2370            ret = loop.run_until_complete(task)
2371        finally:
2372            loop.close()
2373
2374        self.assertEqual([None, 1, 2], ret)
2375
2376    def test_context_5(self):
2377        cvar = contextvars.ContextVar('cvar')
2378
2379        async def coro(val):
2380            await asyncio.sleep(0)
2381            cvar.set(val)
2382
2383        async def main():
2384            ret = []
2385            ctx = contextvars.copy_context()
2386            ret.append(ctx.get(cvar))
2387            t1 = asyncio.create_task(coro(1), context=ctx)
2388            await t1
2389            ret.append(ctx.get(cvar))
2390            t2 = asyncio.create_task(coro(2), context=ctx)
2391            await t2
2392            ret.append(ctx.get(cvar))
2393            return ret
2394
2395        loop = asyncio.new_event_loop()
2396        try:
2397            task = self.new_task(loop, main())
2398            ret = loop.run_until_complete(task)
2399        finally:
2400            loop.close()
2401
2402        self.assertEqual([None, 1, 2], ret)
2403
2404    def test_context_6(self):
2405        cvar = contextvars.ContextVar('cvar')
2406
2407        async def coro(val):
2408            await asyncio.sleep(0)
2409            cvar.set(val)
2410
2411        async def main():
2412            ret = []
2413            ctx = contextvars.copy_context()
2414            ret.append(ctx.get(cvar))
2415            t1 = loop.create_task(coro(1), context=ctx)
2416            await t1
2417            ret.append(ctx.get(cvar))
2418            t2 = loop.create_task(coro(2), context=ctx)
2419            await t2
2420            ret.append(ctx.get(cvar))
2421            return ret
2422
2423        loop = asyncio.new_event_loop()
2424        try:
2425            task = loop.create_task(main())
2426            ret = loop.run_until_complete(task)
2427        finally:
2428            loop.close()
2429
2430        self.assertEqual([None, 1, 2], ret)
2431
2432    def test_get_coro(self):
2433        loop = asyncio.new_event_loop()
2434        coro = coroutine_function()
2435        try:
2436            task = self.new_task(loop, coro)
2437            loop.run_until_complete(task)
2438            self.assertIs(task.get_coro(), coro)
2439        finally:
2440            loop.close()
2441
2442
2443def add_subclass_tests(cls):
2444    BaseTask = cls.Task
2445    BaseFuture = cls.Future
2446
2447    if BaseTask is None or BaseFuture is None:
2448        return cls
2449
2450    class CommonFuture:
2451        def __init__(self, *args, **kwargs):
2452            self.calls = collections.defaultdict(lambda: 0)
2453            super().__init__(*args, **kwargs)
2454
2455        def add_done_callback(self, *args, **kwargs):
2456            self.calls['add_done_callback'] += 1
2457            return super().add_done_callback(*args, **kwargs)
2458
2459    class Task(CommonFuture, BaseTask):
2460        pass
2461
2462    class Future(CommonFuture, BaseFuture):
2463        pass
2464
2465    def test_subclasses_ctask_cfuture(self):
2466        fut = self.Future(loop=self.loop)
2467
2468        async def func():
2469            self.loop.call_soon(lambda: fut.set_result('spam'))
2470            return await fut
2471
2472        task = self.Task(func(), loop=self.loop)
2473
2474        result = self.loop.run_until_complete(task)
2475
2476        self.assertEqual(result, 'spam')
2477
2478        self.assertEqual(
2479            dict(task.calls),
2480            {'add_done_callback': 1})
2481
2482        self.assertEqual(
2483            dict(fut.calls),
2484            {'add_done_callback': 1})
2485
2486    # Add patched Task & Future back to the test case
2487    cls.Task = Task
2488    cls.Future = Future
2489
2490    # Add an extra unit-test
2491    cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
2492
2493    # Disable the "test_task_source_traceback" test
2494    # (the test is hardcoded for a particular call stack, which
2495    # is slightly different for Task subclasses)
2496    cls.test_task_source_traceback = None
2497
2498    return cls
2499
2500
2501class SetMethodsTest:
2502
2503    def test_set_result_causes_invalid_state(self):
2504        Future = type(self).Future
2505        self.loop.call_exception_handler = exc_handler = mock.Mock()
2506
2507        async def foo():
2508            await asyncio.sleep(0.1)
2509            return 10
2510
2511        coro = foo()
2512        task = self.new_task(self.loop, coro)
2513        Future.set_result(task, 'spam')
2514
2515        self.assertEqual(
2516            self.loop.run_until_complete(task),
2517            'spam')
2518
2519        exc_handler.assert_called_once()
2520        exc = exc_handler.call_args[0][0]['exception']
2521        with self.assertRaisesRegex(asyncio.InvalidStateError,
2522                                    r'step\(\): already done'):
2523            raise exc
2524
2525        coro.close()
2526
2527    def test_set_exception_causes_invalid_state(self):
2528        class MyExc(Exception):
2529            pass
2530
2531        Future = type(self).Future
2532        self.loop.call_exception_handler = exc_handler = mock.Mock()
2533
2534        async def foo():
2535            await asyncio.sleep(0.1)
2536            return 10
2537
2538        coro = foo()
2539        task = self.new_task(self.loop, coro)
2540        Future.set_exception(task, MyExc())
2541
2542        with self.assertRaises(MyExc):
2543            self.loop.run_until_complete(task)
2544
2545        exc_handler.assert_called_once()
2546        exc = exc_handler.call_args[0][0]['exception']
2547        with self.assertRaisesRegex(asyncio.InvalidStateError,
2548                                    r'step\(\): already done'):
2549            raise exc
2550
2551        coro.close()
2552
2553
2554@unittest.skipUnless(hasattr(futures, '_CFuture') and
2555                     hasattr(tasks, '_CTask'),
2556                     'requires the C _asyncio module')
2557class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
2558                          test_utils.TestCase):
2559
2560    Task = getattr(tasks, '_CTask', None)
2561    Future = getattr(futures, '_CFuture', None)
2562
2563    @support.refcount_test
2564    def test_refleaks_in_task___init__(self):
2565        gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
2566        async def coro():
2567            pass
2568        task = self.new_task(self.loop, coro())
2569        self.loop.run_until_complete(task)
2570        refs_before = gettotalrefcount()
2571        for i in range(100):
2572            task.__init__(coro(), loop=self.loop)
2573            self.loop.run_until_complete(task)
2574        self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
2575
2576    def test_del__log_destroy_pending_segfault(self):
2577        async def coro():
2578            pass
2579        task = self.new_task(self.loop, coro())
2580        self.loop.run_until_complete(task)
2581        with self.assertRaises(AttributeError):
2582            del task._log_destroy_pending
2583
2584
2585@unittest.skipUnless(hasattr(futures, '_CFuture') and
2586                     hasattr(tasks, '_CTask'),
2587                     'requires the C _asyncio module')
2588@add_subclass_tests
2589class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2590
2591    Task = getattr(tasks, '_CTask', None)
2592    Future = getattr(futures, '_CFuture', None)
2593
2594
2595@unittest.skipUnless(hasattr(tasks, '_CTask'),
2596                     'requires the C _asyncio module')
2597@add_subclass_tests
2598class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2599
2600    Task = getattr(tasks, '_CTask', None)
2601    Future = futures._PyFuture
2602
2603
2604@unittest.skipUnless(hasattr(futures, '_CFuture'),
2605                     'requires the C _asyncio module')
2606@add_subclass_tests
2607class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
2608
2609    Future = getattr(futures, '_CFuture', None)
2610    Task = tasks._PyTask
2611
2612
2613@unittest.skipUnless(hasattr(tasks, '_CTask'),
2614                     'requires the C _asyncio module')
2615class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2616
2617    Task = getattr(tasks, '_CTask', None)
2618    Future = futures._PyFuture
2619
2620
2621@unittest.skipUnless(hasattr(futures, '_CFuture'),
2622                     'requires the C _asyncio module')
2623class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
2624
2625    Task = tasks._PyTask
2626    Future = getattr(futures, '_CFuture', None)
2627
2628
2629class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
2630                            test_utils.TestCase):
2631
2632    Task = tasks._PyTask
2633    Future = futures._PyFuture
2634
2635
2636@add_subclass_tests
2637class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2638    Task = tasks._PyTask
2639    Future = futures._PyFuture
2640
2641
2642@unittest.skipUnless(hasattr(tasks, '_CTask'),
2643                     'requires the C _asyncio module')
2644class CTask_Future_Tests(test_utils.TestCase):
2645
2646    def test_foobar(self):
2647        class Fut(asyncio.Future):
2648            @property
2649            def get_loop(self):
2650                raise AttributeError
2651
2652        async def coro():
2653            await fut
2654            return 'spam'
2655
2656        self.loop = asyncio.new_event_loop()
2657        try:
2658            fut = Fut(loop=self.loop)
2659            self.loop.call_later(0.1, fut.set_result, 1)
2660            task = self.loop.create_task(coro())
2661            res = self.loop.run_until_complete(task)
2662        finally:
2663            self.loop.close()
2664
2665        self.assertEqual(res, 'spam')
2666
2667
2668class BaseTaskIntrospectionTests:
2669    _register_task = None
2670    _unregister_task = None
2671    _enter_task = None
2672    _leave_task = None
2673
2674    def test__register_task_1(self):
2675        class TaskLike:
2676            @property
2677            def _loop(self):
2678                return loop
2679
2680            def done(self):
2681                return False
2682
2683        task = TaskLike()
2684        loop = mock.Mock()
2685
2686        self.assertEqual(asyncio.all_tasks(loop), set())
2687        self._register_task(task)
2688        self.assertEqual(asyncio.all_tasks(loop), {task})
2689        self._unregister_task(task)
2690
2691    def test__register_task_2(self):
2692        class TaskLike:
2693            def get_loop(self):
2694                return loop
2695
2696            def done(self):
2697                return False
2698
2699        task = TaskLike()
2700        loop = mock.Mock()
2701
2702        self.assertEqual(asyncio.all_tasks(loop), set())
2703        self._register_task(task)
2704        self.assertEqual(asyncio.all_tasks(loop), {task})
2705        self._unregister_task(task)
2706
2707    def test__register_task_3(self):
2708        class TaskLike:
2709            def get_loop(self):
2710                return loop
2711
2712            def done(self):
2713                return True
2714
2715        task = TaskLike()
2716        loop = mock.Mock()
2717
2718        self.assertEqual(asyncio.all_tasks(loop), set())
2719        self._register_task(task)
2720        self.assertEqual(asyncio.all_tasks(loop), set())
2721        self._unregister_task(task)
2722
2723    def test__enter_task(self):
2724        task = mock.Mock()
2725        loop = mock.Mock()
2726        self.assertIsNone(asyncio.current_task(loop))
2727        self._enter_task(loop, task)
2728        self.assertIs(asyncio.current_task(loop), task)
2729        self._leave_task(loop, task)
2730
2731    def test__enter_task_failure(self):
2732        task1 = mock.Mock()
2733        task2 = mock.Mock()
2734        loop = mock.Mock()
2735        self._enter_task(loop, task1)
2736        with self.assertRaises(RuntimeError):
2737            self._enter_task(loop, task2)
2738        self.assertIs(asyncio.current_task(loop), task1)
2739        self._leave_task(loop, task1)
2740
2741    def test__leave_task(self):
2742        task = mock.Mock()
2743        loop = mock.Mock()
2744        self._enter_task(loop, task)
2745        self._leave_task(loop, task)
2746        self.assertIsNone(asyncio.current_task(loop))
2747
2748    def test__leave_task_failure1(self):
2749        task1 = mock.Mock()
2750        task2 = mock.Mock()
2751        loop = mock.Mock()
2752        self._enter_task(loop, task1)
2753        with self.assertRaises(RuntimeError):
2754            self._leave_task(loop, task2)
2755        self.assertIs(asyncio.current_task(loop), task1)
2756        self._leave_task(loop, task1)
2757
2758    def test__leave_task_failure2(self):
2759        task = mock.Mock()
2760        loop = mock.Mock()
2761        with self.assertRaises(RuntimeError):
2762            self._leave_task(loop, task)
2763        self.assertIsNone(asyncio.current_task(loop))
2764
2765    def test__unregister_task(self):
2766        task = mock.Mock()
2767        loop = mock.Mock()
2768        task.get_loop = lambda: loop
2769        self._register_task(task)
2770        self._unregister_task(task)
2771        self.assertEqual(asyncio.all_tasks(loop), set())
2772
2773    def test__unregister_task_not_registered(self):
2774        task = mock.Mock()
2775        loop = mock.Mock()
2776        self._unregister_task(task)
2777        self.assertEqual(asyncio.all_tasks(loop), set())
2778
2779
2780class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
2781    _register_task = staticmethod(tasks._py_register_task)
2782    _unregister_task = staticmethod(tasks._py_unregister_task)
2783    _enter_task = staticmethod(tasks._py_enter_task)
2784    _leave_task = staticmethod(tasks._py_leave_task)
2785
2786
2787@unittest.skipUnless(hasattr(tasks, '_c_register_task'),
2788                     'requires the C _asyncio module')
2789class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
2790    if hasattr(tasks, '_c_register_task'):
2791        _register_task = staticmethod(tasks._c_register_task)
2792        _unregister_task = staticmethod(tasks._c_unregister_task)
2793        _enter_task = staticmethod(tasks._c_enter_task)
2794        _leave_task = staticmethod(tasks._c_leave_task)
2795    else:
2796        _register_task = _unregister_task = _enter_task = _leave_task = None
2797
2798
2799class BaseCurrentLoopTests:
2800
2801    def setUp(self):
2802        super().setUp()
2803        self.loop = asyncio.new_event_loop()
2804        self.set_event_loop(self.loop)
2805
2806    def new_task(self, coro):
2807        raise NotImplementedError
2808
2809    def test_current_task_no_running_loop(self):
2810        self.assertIsNone(asyncio.current_task(loop=self.loop))
2811
2812    def test_current_task_no_running_loop_implicit(self):
2813        with self.assertRaisesRegex(RuntimeError, 'no running event loop'):
2814            asyncio.current_task()
2815
2816    def test_current_task_with_implicit_loop(self):
2817        async def coro():
2818            self.assertIs(asyncio.current_task(loop=self.loop), task)
2819
2820            self.assertIs(asyncio.current_task(None), task)
2821            self.assertIs(asyncio.current_task(), task)
2822
2823        task = self.new_task(coro())
2824        self.loop.run_until_complete(task)
2825        self.assertIsNone(asyncio.current_task(loop=self.loop))
2826
2827
2828class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
2829
2830    def new_task(self, coro):
2831        return tasks._PyTask(coro, loop=self.loop)
2832
2833
2834@unittest.skipUnless(hasattr(tasks, '_CTask'),
2835                     'requires the C _asyncio module')
2836class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
2837
2838    def new_task(self, coro):
2839        return getattr(tasks, '_CTask')(coro, loop=self.loop)
2840
2841
2842class GenericTaskTests(test_utils.TestCase):
2843
2844    def test_future_subclass(self):
2845        self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
2846
2847    @support.cpython_only
2848    def test_asyncio_module_compiled(self):
2849        # Because of circular imports it's easy to make _asyncio
2850        # module non-importable.  This is a simple test that will
2851        # fail on systems where C modules were successfully compiled
2852        # (hence the test for _functools etc), but _asyncio somehow didn't.
2853        try:
2854            import _functools
2855            import _json
2856            import _pickle
2857        except ImportError:
2858            self.skipTest('C modules are not available')
2859        else:
2860            try:
2861                import _asyncio
2862            except ImportError:
2863                self.fail('_asyncio module is missing')
2864
2865
2866class GatherTestsBase:
2867
2868    def setUp(self):
2869        super().setUp()
2870        self.one_loop = self.new_test_loop()
2871        self.other_loop = self.new_test_loop()
2872        self.set_event_loop(self.one_loop, cleanup=False)
2873
2874    def _run_loop(self, loop):
2875        while loop._ready:
2876            test_utils.run_briefly(loop)
2877
2878    def _check_success(self, **kwargs):
2879        a, b, c = [self.one_loop.create_future() for i in range(3)]
2880        fut = self._gather(*self.wrap_futures(a, b, c), **kwargs)
2881        cb = test_utils.MockCallback()
2882        fut.add_done_callback(cb)
2883        b.set_result(1)
2884        a.set_result(2)
2885        self._run_loop(self.one_loop)
2886        self.assertEqual(cb.called, False)
2887        self.assertFalse(fut.done())
2888        c.set_result(3)
2889        self._run_loop(self.one_loop)
2890        cb.assert_called_once_with(fut)
2891        self.assertEqual(fut.result(), [2, 1, 3])
2892
2893    def test_success(self):
2894        self._check_success()
2895        self._check_success(return_exceptions=False)
2896
2897    def test_result_exception_success(self):
2898        self._check_success(return_exceptions=True)
2899
2900    def test_one_exception(self):
2901        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
2902        fut = self._gather(*self.wrap_futures(a, b, c, d, e))
2903        cb = test_utils.MockCallback()
2904        fut.add_done_callback(cb)
2905        exc = ZeroDivisionError()
2906        a.set_result(1)
2907        b.set_exception(exc)
2908        self._run_loop(self.one_loop)
2909        self.assertTrue(fut.done())
2910        cb.assert_called_once_with(fut)
2911        self.assertIs(fut.exception(), exc)
2912        # Does nothing
2913        c.set_result(3)
2914        d.cancel()
2915        e.set_exception(RuntimeError())
2916        e.exception()
2917
2918    def test_return_exceptions(self):
2919        a, b, c, d = [self.one_loop.create_future() for i in range(4)]
2920        fut = self._gather(*self.wrap_futures(a, b, c, d),
2921                           return_exceptions=True)
2922        cb = test_utils.MockCallback()
2923        fut.add_done_callback(cb)
2924        exc = ZeroDivisionError()
2925        exc2 = RuntimeError()
2926        b.set_result(1)
2927        c.set_exception(exc)
2928        a.set_result(3)
2929        self._run_loop(self.one_loop)
2930        self.assertFalse(fut.done())
2931        d.set_exception(exc2)
2932        self._run_loop(self.one_loop)
2933        self.assertTrue(fut.done())
2934        cb.assert_called_once_with(fut)
2935        self.assertEqual(fut.result(), [3, 1, exc, exc2])
2936
2937    def test_env_var_debug(self):
2938        code = '\n'.join((
2939            'import asyncio.coroutines',
2940            'print(asyncio.coroutines._is_debug_mode())'))
2941
2942        # Test with -E to not fail if the unit test was run with
2943        # PYTHONASYNCIODEBUG set to a non-empty string
2944        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
2945        self.assertEqual(stdout.rstrip(), b'False')
2946
2947        sts, stdout, stderr = assert_python_ok('-c', code,
2948                                               PYTHONASYNCIODEBUG='',
2949                                               PYTHONDEVMODE='')
2950        self.assertEqual(stdout.rstrip(), b'False')
2951
2952        sts, stdout, stderr = assert_python_ok('-c', code,
2953                                               PYTHONASYNCIODEBUG='1',
2954                                               PYTHONDEVMODE='')
2955        self.assertEqual(stdout.rstrip(), b'True')
2956
2957        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
2958                                               PYTHONASYNCIODEBUG='1',
2959                                               PYTHONDEVMODE='')
2960        self.assertEqual(stdout.rstrip(), b'False')
2961
2962        # -X dev
2963        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
2964                                               '-c', code)
2965        self.assertEqual(stdout.rstrip(), b'True')
2966
2967
2968class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
2969
2970    def wrap_futures(self, *futures):
2971        return futures
2972
2973    def _gather(self, *args, **kwargs):
2974        return asyncio.gather(*args, **kwargs)
2975
2976    def test_constructor_empty_sequence_without_loop(self):
2977        with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
2978            asyncio.gather()
2979
2980    def test_constructor_empty_sequence_use_running_loop(self):
2981        async def gather():
2982            return asyncio.gather()
2983        fut = self.one_loop.run_until_complete(gather())
2984        self.assertIsInstance(fut, asyncio.Future)
2985        self.assertIs(fut._loop, self.one_loop)
2986        self._run_loop(self.one_loop)
2987        self.assertTrue(fut.done())
2988        self.assertEqual(fut.result(), [])
2989
2990    def test_constructor_empty_sequence_use_global_loop(self):
2991        # Deprecated in 3.10, undeprecated in 3.11.1
2992        asyncio.set_event_loop(self.one_loop)
2993        self.addCleanup(asyncio.set_event_loop, None)
2994        fut = asyncio.gather()
2995        self.assertIsInstance(fut, asyncio.Future)
2996        self.assertIs(fut._loop, self.one_loop)
2997        self._run_loop(self.one_loop)
2998        self.assertTrue(fut.done())
2999        self.assertEqual(fut.result(), [])
3000
3001    def test_constructor_heterogenous_futures(self):
3002        fut1 = self.one_loop.create_future()
3003        fut2 = self.other_loop.create_future()
3004        with self.assertRaises(ValueError):
3005            asyncio.gather(fut1, fut2)
3006
3007    def test_constructor_homogenous_futures(self):
3008        children = [self.other_loop.create_future() for i in range(3)]
3009        fut = asyncio.gather(*children)
3010        self.assertIs(fut._loop, self.other_loop)
3011        self._run_loop(self.other_loop)
3012        self.assertFalse(fut.done())
3013        fut = asyncio.gather(*children)
3014        self.assertIs(fut._loop, self.other_loop)
3015        self._run_loop(self.other_loop)
3016        self.assertFalse(fut.done())
3017
3018    def test_one_cancellation(self):
3019        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3020        fut = asyncio.gather(a, b, c, d, e)
3021        cb = test_utils.MockCallback()
3022        fut.add_done_callback(cb)
3023        a.set_result(1)
3024        b.cancel()
3025        self._run_loop(self.one_loop)
3026        self.assertTrue(fut.done())
3027        cb.assert_called_once_with(fut)
3028        self.assertFalse(fut.cancelled())
3029        self.assertIsInstance(fut.exception(), asyncio.CancelledError)
3030        # Does nothing
3031        c.set_result(3)
3032        d.cancel()
3033        e.set_exception(RuntimeError())
3034        e.exception()
3035
3036    def test_result_exception_one_cancellation(self):
3037        a, b, c, d, e, f = [self.one_loop.create_future()
3038                            for i in range(6)]
3039        fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
3040        cb = test_utils.MockCallback()
3041        fut.add_done_callback(cb)
3042        a.set_result(1)
3043        zde = ZeroDivisionError()
3044        b.set_exception(zde)
3045        c.cancel()
3046        self._run_loop(self.one_loop)
3047        self.assertFalse(fut.done())
3048        d.set_result(3)
3049        e.cancel()
3050        rte = RuntimeError()
3051        f.set_exception(rte)
3052        res = self.one_loop.run_until_complete(fut)
3053        self.assertIsInstance(res[2], asyncio.CancelledError)
3054        self.assertIsInstance(res[4], asyncio.CancelledError)
3055        res[2] = res[4] = None
3056        self.assertEqual(res, [1, zde, None, 3, None, rte])
3057        cb.assert_called_once_with(fut)
3058
3059
3060class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
3061
3062    def wrap_futures(self, *futures):
3063        coros = []
3064        for fut in futures:
3065            async def coro(fut=fut):
3066                return await fut
3067            coros.append(coro())
3068        return coros
3069
3070    def _gather(self, *args, **kwargs):
3071        async def coro():
3072            return asyncio.gather(*args, **kwargs)
3073        return self.one_loop.run_until_complete(coro())
3074
3075    def test_constructor_without_loop(self):
3076        async def coro():
3077            return 'abc'
3078        gen1 = coro()
3079        self.addCleanup(gen1.close)
3080        gen2 = coro()
3081        self.addCleanup(gen2.close)
3082        with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
3083            asyncio.gather(gen1, gen2)
3084
3085    def test_constructor_use_running_loop(self):
3086        async def coro():
3087            return 'abc'
3088        gen1 = coro()
3089        gen2 = coro()
3090        async def gather():
3091            return asyncio.gather(gen1, gen2)
3092        fut = self.one_loop.run_until_complete(gather())
3093        self.assertIs(fut._loop, self.one_loop)
3094        self.one_loop.run_until_complete(fut)
3095
3096    def test_constructor_use_global_loop(self):
3097        # Deprecated in 3.10, undeprecated in 3.11.1
3098        async def coro():
3099            return 'abc'
3100        asyncio.set_event_loop(self.other_loop)
3101        self.addCleanup(asyncio.set_event_loop, None)
3102        gen1 = coro()
3103        gen2 = coro()
3104        fut = asyncio.gather(gen1, gen2)
3105        self.assertIs(fut._loop, self.other_loop)
3106        self.other_loop.run_until_complete(fut)
3107
3108    def test_duplicate_coroutines(self):
3109        async def coro(s):
3110            return s
3111        c = coro('abc')
3112        fut = self._gather(c, c, coro('def'), c)
3113        self._run_loop(self.one_loop)
3114        self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
3115
3116    def test_cancellation_broadcast(self):
3117        # Cancelling outer() cancels all children.
3118        proof = 0
3119        waiter = self.one_loop.create_future()
3120
3121        async def inner():
3122            nonlocal proof
3123            await waiter
3124            proof += 1
3125
3126        child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
3127        child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
3128        gatherer = None
3129
3130        async def outer():
3131            nonlocal proof, gatherer
3132            gatherer = asyncio.gather(child1, child2)
3133            await gatherer
3134            proof += 100
3135
3136        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3137        test_utils.run_briefly(self.one_loop)
3138        self.assertTrue(f.cancel())
3139        with self.assertRaises(asyncio.CancelledError):
3140            self.one_loop.run_until_complete(f)
3141        self.assertFalse(gatherer.cancel())
3142        self.assertTrue(waiter.cancelled())
3143        self.assertTrue(child1.cancelled())
3144        self.assertTrue(child2.cancelled())
3145        test_utils.run_briefly(self.one_loop)
3146        self.assertEqual(proof, 0)
3147
3148    def test_exception_marking(self):
3149        # Test for the first line marked "Mark exception retrieved."
3150
3151        async def inner(f):
3152            await f
3153            raise RuntimeError('should not be ignored')
3154
3155        a = self.one_loop.create_future()
3156        b = self.one_loop.create_future()
3157
3158        async def outer():
3159            await asyncio.gather(inner(a), inner(b))
3160
3161        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3162        test_utils.run_briefly(self.one_loop)
3163        a.set_result(None)
3164        test_utils.run_briefly(self.one_loop)
3165        b.set_result(None)
3166        test_utils.run_briefly(self.one_loop)
3167        self.assertIsInstance(f.exception(), RuntimeError)
3168
3169    def test_issue46672(self):
3170        with mock.patch(
3171            'asyncio.base_events.BaseEventLoop.call_exception_handler',
3172        ):
3173            async def coro(s):
3174                return s
3175            c = coro('abc')
3176
3177            with self.assertRaises(TypeError):
3178                self._gather(c, {})
3179            self._run_loop(self.one_loop)
3180            # NameError should not happen:
3181            self.one_loop.call_exception_handler.assert_not_called()
3182
3183
3184class RunCoroutineThreadsafeTests(test_utils.TestCase):
3185    """Test case for asyncio.run_coroutine_threadsafe."""
3186
3187    def setUp(self):
3188        super().setUp()
3189        self.loop = asyncio.new_event_loop()
3190        self.set_event_loop(self.loop) # Will cleanup properly
3191
3192    async def add(self, a, b, fail=False, cancel=False):
3193        """Wait 0.05 second and return a + b."""
3194        await asyncio.sleep(0.05)
3195        if fail:
3196            raise RuntimeError("Fail!")
3197        if cancel:
3198            asyncio.current_task(self.loop).cancel()
3199            await asyncio.sleep(0)
3200        return a + b
3201
3202    def target(self, fail=False, cancel=False, timeout=None,
3203               advance_coro=False):
3204        """Run add coroutine in the event loop."""
3205        coro = self.add(1, 2, fail=fail, cancel=cancel)
3206        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
3207        if advance_coro:
3208            # this is for test_run_coroutine_threadsafe_task_factory_exception;
3209            # otherwise it spills errors and breaks **other** unittests, since
3210            # 'target' is interacting with threads.
3211
3212            # With this call, `coro` will be advanced.
3213            self.loop.call_soon_threadsafe(coro.send, None)
3214        try:
3215            return future.result(timeout)
3216        finally:
3217            future.done() or future.cancel()
3218
3219    def test_run_coroutine_threadsafe(self):
3220        """Test coroutine submission from a thread to an event loop."""
3221        future = self.loop.run_in_executor(None, self.target)
3222        result = self.loop.run_until_complete(future)
3223        self.assertEqual(result, 3)
3224
3225    def test_run_coroutine_threadsafe_with_exception(self):
3226        """Test coroutine submission from a thread to an event loop
3227        when an exception is raised."""
3228        future = self.loop.run_in_executor(None, self.target, True)
3229        with self.assertRaises(RuntimeError) as exc_context:
3230            self.loop.run_until_complete(future)
3231        self.assertIn("Fail!", exc_context.exception.args)
3232
3233    def test_run_coroutine_threadsafe_with_timeout(self):
3234        """Test coroutine submission from a thread to an event loop
3235        when a timeout is raised."""
3236        callback = lambda: self.target(timeout=0)
3237        future = self.loop.run_in_executor(None, callback)
3238        with self.assertRaises(asyncio.TimeoutError):
3239            self.loop.run_until_complete(future)
3240        test_utils.run_briefly(self.loop)
3241        # Check that there's no pending task (add has been cancelled)
3242        for task in asyncio.all_tasks(self.loop):
3243            self.assertTrue(task.done())
3244
3245    def test_run_coroutine_threadsafe_task_cancelled(self):
3246        """Test coroutine submission from a thread to an event loop
3247        when the task is cancelled."""
3248        callback = lambda: self.target(cancel=True)
3249        future = self.loop.run_in_executor(None, callback)
3250        with self.assertRaises(asyncio.CancelledError):
3251            self.loop.run_until_complete(future)
3252
3253    def test_run_coroutine_threadsafe_task_factory_exception(self):
3254        """Test coroutine submission from a thread to an event loop
3255        when the task factory raise an exception."""
3256
3257        def task_factory(loop, coro):
3258            raise NameError
3259
3260        run = self.loop.run_in_executor(
3261            None, lambda: self.target(advance_coro=True))
3262
3263        # Set exception handler
3264        callback = test_utils.MockCallback()
3265        self.loop.set_exception_handler(callback)
3266
3267        # Set corrupted task factory
3268        self.addCleanup(self.loop.set_task_factory,
3269                        self.loop.get_task_factory())
3270        self.loop.set_task_factory(task_factory)
3271
3272        # Run event loop
3273        with self.assertRaises(NameError) as exc_context:
3274            self.loop.run_until_complete(run)
3275
3276        # Check exceptions
3277        self.assertEqual(len(callback.call_args_list), 1)
3278        (loop, context), kwargs = callback.call_args
3279        self.assertEqual(context['exception'], exc_context.exception)
3280
3281
3282class SleepTests(test_utils.TestCase):
3283    def setUp(self):
3284        super().setUp()
3285        self.loop = asyncio.new_event_loop()
3286        self.set_event_loop(self.loop)
3287
3288    def tearDown(self):
3289        self.loop.close()
3290        self.loop = None
3291        super().tearDown()
3292
3293    def test_sleep_zero(self):
3294        result = 0
3295
3296        def inc_result(num):
3297            nonlocal result
3298            result += num
3299
3300        async def coro():
3301            self.loop.call_soon(inc_result, 1)
3302            self.assertEqual(result, 0)
3303            num = await asyncio.sleep(0, result=10)
3304            self.assertEqual(result, 1) # inc'ed by call_soon
3305            inc_result(num) # num should be 11
3306
3307        self.loop.run_until_complete(coro())
3308        self.assertEqual(result, 11)
3309
3310
3311class CompatibilityTests(test_utils.TestCase):
3312    # Tests for checking a bridge between old-styled coroutines
3313    # and async/await syntax
3314
3315    def setUp(self):
3316        super().setUp()
3317        self.loop = asyncio.new_event_loop()
3318        self.set_event_loop(self.loop)
3319
3320    def tearDown(self):
3321        self.loop.close()
3322        self.loop = None
3323        super().tearDown()
3324
3325
3326if __name__ == '__main__':
3327    unittest.main()
3328