1"""Tests for asyncio/timeouts.py"""
2
3import unittest
4import time
5
6import asyncio
7from asyncio import tasks
8
9
10def tearDownModule():
11    asyncio.set_event_loop_policy(None)
12
13
14class TimeoutTests(unittest.IsolatedAsyncioTestCase):
15
16    async def test_timeout_basic(self):
17        with self.assertRaises(TimeoutError):
18            async with asyncio.timeout(0.01) as cm:
19                await asyncio.sleep(10)
20        self.assertTrue(cm.expired())
21
22    async def test_timeout_at_basic(self):
23        loop = asyncio.get_running_loop()
24
25        with self.assertRaises(TimeoutError):
26            deadline = loop.time() + 0.01
27            async with asyncio.timeout_at(deadline) as cm:
28                await asyncio.sleep(10)
29        self.assertTrue(cm.expired())
30        self.assertEqual(deadline, cm.when())
31
32    async def test_nested_timeouts(self):
33        loop = asyncio.get_running_loop()
34        cancelled = False
35        with self.assertRaises(TimeoutError):
36            deadline = loop.time() + 0.01
37            async with asyncio.timeout_at(deadline) as cm1:
38                # Only the topmost context manager should raise TimeoutError
39                try:
40                    async with asyncio.timeout_at(deadline) as cm2:
41                        await asyncio.sleep(10)
42                except asyncio.CancelledError:
43                    cancelled = True
44                    raise
45        self.assertTrue(cancelled)
46        self.assertTrue(cm1.expired())
47        self.assertTrue(cm2.expired())
48
49    async def test_waiter_cancelled(self):
50        loop = asyncio.get_running_loop()
51        cancelled = False
52        with self.assertRaises(TimeoutError):
53            async with asyncio.timeout(0.01):
54                try:
55                    await asyncio.sleep(10)
56                except asyncio.CancelledError:
57                    cancelled = True
58                    raise
59        self.assertTrue(cancelled)
60
61    async def test_timeout_not_called(self):
62        loop = asyncio.get_running_loop()
63        t0 = loop.time()
64        async with asyncio.timeout(10) as cm:
65            await asyncio.sleep(0.01)
66        t1 = loop.time()
67
68        self.assertFalse(cm.expired())
69        # 2 sec for slow CI boxes
70        self.assertLess(t1-t0, 2)
71        self.assertGreater(cm.when(), t1)
72
73    async def test_timeout_disabled(self):
74        loop = asyncio.get_running_loop()
75        t0 = loop.time()
76        async with asyncio.timeout(None) as cm:
77            await asyncio.sleep(0.01)
78        t1 = loop.time()
79
80        self.assertFalse(cm.expired())
81        self.assertIsNone(cm.when())
82        # 2 sec for slow CI boxes
83        self.assertLess(t1-t0, 2)
84
85    async def test_timeout_at_disabled(self):
86        loop = asyncio.get_running_loop()
87        t0 = loop.time()
88        async with asyncio.timeout_at(None) as cm:
89            await asyncio.sleep(0.01)
90        t1 = loop.time()
91
92        self.assertFalse(cm.expired())
93        self.assertIsNone(cm.when())
94        # 2 sec for slow CI boxes
95        self.assertLess(t1-t0, 2)
96
97    async def test_timeout_zero(self):
98        loop = asyncio.get_running_loop()
99        t0 = loop.time()
100        with self.assertRaises(TimeoutError):
101            async with asyncio.timeout(0) as cm:
102                await asyncio.sleep(10)
103        t1 = loop.time()
104        self.assertTrue(cm.expired())
105        # 2 sec for slow CI boxes
106        self.assertLess(t1-t0, 2)
107        self.assertTrue(t0 <= cm.when() <= t1)
108
109    async def test_timeout_zero_sleep_zero(self):
110        loop = asyncio.get_running_loop()
111        t0 = loop.time()
112        with self.assertRaises(TimeoutError):
113            async with asyncio.timeout(0) as cm:
114                await asyncio.sleep(0)
115        t1 = loop.time()
116        self.assertTrue(cm.expired())
117        # 2 sec for slow CI boxes
118        self.assertLess(t1-t0, 2)
119        self.assertTrue(t0 <= cm.when() <= t1)
120
121    async def test_timeout_in_the_past_sleep_zero(self):
122        loop = asyncio.get_running_loop()
123        t0 = loop.time()
124        with self.assertRaises(TimeoutError):
125            async with asyncio.timeout(-11) as cm:
126                await asyncio.sleep(0)
127        t1 = loop.time()
128        self.assertTrue(cm.expired())
129        # 2 sec for slow CI boxes
130        self.assertLess(t1-t0, 2)
131        self.assertTrue(t0 >= cm.when() <= t1)
132
133    async def test_foreign_exception_passed(self):
134        with self.assertRaises(KeyError):
135            async with asyncio.timeout(0.01) as cm:
136                raise KeyError
137        self.assertFalse(cm.expired())
138
139    async def test_foreign_exception_on_timeout(self):
140        async def crash():
141            try:
142                await asyncio.sleep(1)
143            finally:
144                1/0
145        with self.assertRaises(ZeroDivisionError):
146            async with asyncio.timeout(0.01):
147                await crash()
148
149    async def test_foreign_cancel_doesnt_timeout_if_not_expired(self):
150        with self.assertRaises(asyncio.CancelledError):
151            async with asyncio.timeout(10) as cm:
152                asyncio.current_task().cancel()
153                await asyncio.sleep(10)
154        self.assertFalse(cm.expired())
155
156    async def test_outer_task_is_not_cancelled(self):
157        async def outer() -> None:
158            with self.assertRaises(TimeoutError):
159                async with asyncio.timeout(0.001):
160                    await asyncio.sleep(10)
161
162        task = asyncio.create_task(outer())
163        await task
164        self.assertFalse(task.cancelled())
165        self.assertTrue(task.done())
166
167    async def test_nested_timeouts_concurrent(self):
168        with self.assertRaises(TimeoutError):
169            async with asyncio.timeout(0.002):
170                with self.assertRaises(TimeoutError):
171                    async with asyncio.timeout(0.1):
172                        # Pretend we crunch some numbers.
173                        time.sleep(0.01)
174                        await asyncio.sleep(1)
175
176    async def test_nested_timeouts_loop_busy(self):
177        # After the inner timeout is an expensive operation which should
178        # be stopped by the outer timeout.
179        loop = asyncio.get_running_loop()
180        # Disable a message about long running task
181        loop.slow_callback_duration = 10
182        t0 = loop.time()
183        with self.assertRaises(TimeoutError):
184            async with asyncio.timeout(0.1):  # (1)
185                with self.assertRaises(TimeoutError):
186                    async with asyncio.timeout(0.01):  # (2)
187                        # Pretend the loop is busy for a while.
188                        time.sleep(0.1)
189                        await asyncio.sleep(1)
190                # TimeoutError was cought by (2)
191                await asyncio.sleep(10) # This sleep should be interrupted by (1)
192        t1 = loop.time()
193        self.assertTrue(t0 <= t1 <= t0 + 1)
194
195    async def test_reschedule(self):
196        loop = asyncio.get_running_loop()
197        fut = loop.create_future()
198        deadline1 = loop.time() + 10
199        deadline2 = deadline1 + 20
200
201        async def f():
202            async with asyncio.timeout_at(deadline1) as cm:
203                fut.set_result(cm)
204                await asyncio.sleep(50)
205
206        task = asyncio.create_task(f())
207        cm = await fut
208
209        self.assertEqual(cm.when(), deadline1)
210        cm.reschedule(deadline2)
211        self.assertEqual(cm.when(), deadline2)
212        cm.reschedule(None)
213        self.assertIsNone(cm.when())
214
215        task.cancel()
216
217        with self.assertRaises(asyncio.CancelledError):
218            await task
219        self.assertFalse(cm.expired())
220
221    async def test_repr_active(self):
222        async with asyncio.timeout(10) as cm:
223            self.assertRegex(repr(cm), r"<Timeout \[active\] when=\d+\.\d*>")
224
225    async def test_repr_expired(self):
226        with self.assertRaises(TimeoutError):
227            async with asyncio.timeout(0.01) as cm:
228                await asyncio.sleep(10)
229        self.assertEqual(repr(cm), "<Timeout [expired]>")
230
231    async def test_repr_finished(self):
232        async with asyncio.timeout(10) as cm:
233            await asyncio.sleep(0)
234
235        self.assertEqual(repr(cm), "<Timeout [finished]>")
236
237    async def test_repr_disabled(self):
238        async with asyncio.timeout(None) as cm:
239            self.assertEqual(repr(cm), r"<Timeout [active] when=None>")
240
241    async def test_nested_timeout_in_finally(self):
242        with self.assertRaises(TimeoutError):
243            async with asyncio.timeout(0.01):
244                try:
245                    await asyncio.sleep(1)
246                finally:
247                    with self.assertRaises(TimeoutError):
248                        async with asyncio.timeout(0.01):
249                            await asyncio.sleep(10)
250
251    async def test_timeout_after_cancellation(self):
252        try:
253            asyncio.current_task().cancel()
254            await asyncio.sleep(1)  # work which will be cancelled
255        except asyncio.CancelledError:
256            pass
257        finally:
258            with self.assertRaises(TimeoutError):
259                async with asyncio.timeout(0.0):
260                    await asyncio.sleep(1)  # some cleanup
261
262    async def test_cancel_in_timeout_after_cancellation(self):
263        try:
264            asyncio.current_task().cancel()
265            await asyncio.sleep(1)  # work which will be cancelled
266        except asyncio.CancelledError:
267            pass
268        finally:
269            with self.assertRaises(asyncio.CancelledError):
270                async with asyncio.timeout(1.0):
271                    asyncio.current_task().cancel()
272                    await asyncio.sleep(2)  # some cleanup
273
274    async def test_timeout_exception_cause (self):
275        with self.assertRaises(asyncio.TimeoutError) as exc:
276            async with asyncio.timeout(0):
277                await asyncio.sleep(1)
278        cause = exc.exception.__cause__
279        assert isinstance(cause, asyncio.CancelledError)
280
281
282if __name__ == '__main__':
283    unittest.main()
284