1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 
7 use futures::{Stream, StreamExt};
8 use tokio::time::{self, Duration, Instant, Interval, MissedTickBehavior};
9 use tokio_test::{assert_pending, assert_ready, assert_ready_eq, task};
10 
11 // Takes the `Interval` task, `start` variable, and optional time deltas
12 // For each time delta, it polls the `Interval` and asserts that the result is
13 // equal to `start` + the specific time delta. Then it asserts that the
14 // `Interval` is pending.
15 macro_rules! check_interval_poll {
16     ($i:ident, $start:ident, $($delta:expr),*$(,)?) => {
17         $(
18             assert_ready_eq!(poll_next(&mut $i), $start + ms($delta));
19         )*
20         assert_pending!(poll_next(&mut $i));
21     };
22     ($i:ident, $start:ident) => {
23         check_interval_poll!($i, $start,);
24     };
25 }
26 
27 #[tokio::test]
28 #[should_panic]
interval_zero_duration()29 async fn interval_zero_duration() {
30     let _ = time::interval_at(Instant::now(), ms(0));
31 }
32 
33 // Expected ticks: |     1     |     2     |     3     |     4     |     5     |     6     |
34 // Actual ticks:   | work -----|          delay          | work | work | work -| work -----|
35 // Poll behavior:  |   |       |                         |      |      |       |           |
36 //                 |   |       |                         |      |      |       |           |
37 //          Ready(s)   |       |             Ready(s + 2p)      |      |       |           |
38 //               Pending       |                    Ready(s + 3p)      |       |           |
39 //                  Ready(s + p)                           Ready(s + 4p)       |           |
40 //                                                                 Ready(s + 5p)           |
41 //                                                                             Ready(s + 6p)
42 #[tokio::test(start_paused = true)]
burst()43 async fn burst() {
44     let start = Instant::now();
45 
46     // This is necessary because the timer is only so granular, and in order for
47     // all our ticks to resolve, the time needs to be 1ms ahead of what we
48     // expect, so that the runtime will see that it is time to resolve the timer
49     time::advance(ms(1)).await;
50 
51     let mut i = task::spawn(time::interval_at(start, ms(300)));
52 
53     check_interval_poll!(i, start, 0);
54 
55     time::advance(ms(100)).await;
56     check_interval_poll!(i, start);
57 
58     time::advance(ms(200)).await;
59     check_interval_poll!(i, start, 300);
60 
61     time::advance(ms(650)).await;
62     check_interval_poll!(i, start, 600, 900);
63 
64     time::advance(ms(200)).await;
65     check_interval_poll!(i, start);
66 
67     time::advance(ms(100)).await;
68     check_interval_poll!(i, start, 1200);
69 
70     time::advance(ms(250)).await;
71     check_interval_poll!(i, start, 1500);
72 
73     time::advance(ms(300)).await;
74     check_interval_poll!(i, start, 1800);
75 }
76 
77 // Expected ticks: |     1     |     2     |     3     |     4     |     5     |     6     |
78 // Actual ticks:   | work -----|          delay          | work -----| work -----| work -----|
79 // Poll behavior:  |   |       |                         |   |       |           |           |
80 //                 |   |       |                         |   |       |           |           |
81 //          Ready(s)   |       |             Ready(s + 2p)   |       |           |           |
82 //               Pending       |                       Pending       |           |           |
83 //                  Ready(s + p)                     Ready(s + 2p + d)           |           |
84 //                                                               Ready(s + 3p + d)           |
85 //                                                                           Ready(s + 4p + d)
86 #[tokio::test(start_paused = true)]
delay()87 async fn delay() {
88     let start = Instant::now();
89 
90     // This is necessary because the timer is only so granular, and in order for
91     // all our ticks to resolve, the time needs to be 1ms ahead of what we
92     // expect, so that the runtime will see that it is time to resolve the timer
93     time::advance(ms(1)).await;
94 
95     let mut i = task::spawn(time::interval_at(start, ms(300)));
96     i.set_missed_tick_behavior(MissedTickBehavior::Delay);
97 
98     check_interval_poll!(i, start, 0);
99 
100     time::advance(ms(100)).await;
101     check_interval_poll!(i, start);
102 
103     time::advance(ms(200)).await;
104     check_interval_poll!(i, start, 300);
105 
106     time::advance(ms(650)).await;
107     check_interval_poll!(i, start, 600);
108 
109     time::advance(ms(100)).await;
110     check_interval_poll!(i, start);
111 
112     // We have to add one here for the same reason as is above.
113     // Because `Interval` has reset its timer according to `Instant::now()`,
114     // we have to go forward 1 more millisecond than is expected so that the
115     // runtime realizes that it's time to resolve the timer.
116     time::advance(ms(201)).await;
117     // We add one because when using the `Delay` behavior, `Interval`
118     // adds the `period` from `Instant::now()`, which will always be off by one
119     // because we have to advance time by 1 (see above).
120     check_interval_poll!(i, start, 1251);
121 
122     time::advance(ms(300)).await;
123     // Again, we add one.
124     check_interval_poll!(i, start, 1551);
125 
126     time::advance(ms(300)).await;
127     check_interval_poll!(i, start, 1851);
128 }
129 
130 // Expected ticks: |     1     |     2     |     3     |     4     |     5     |     6     |
131 // Actual ticks:   | work -----|          delay          | work ---| work -----| work -----|
132 // Poll behavior:  |   |       |                         |         |           |           |
133 //                 |   |       |                         |         |           |           |
134 //          Ready(s)   |       |             Ready(s + 2p)         |           |           |
135 //               Pending       |                       Ready(s + 4p)           |           |
136 //                  Ready(s + p)                                   Ready(s + 5p)           |
137 //                                                                             Ready(s + 6p)
138 #[tokio::test(start_paused = true)]
skip()139 async fn skip() {
140     let start = Instant::now();
141 
142     // This is necessary because the timer is only so granular, and in order for
143     // all our ticks to resolve, the time needs to be 1ms ahead of what we
144     // expect, so that the runtime will see that it is time to resolve the timer
145     time::advance(ms(1)).await;
146 
147     let mut i = task::spawn(time::interval_at(start, ms(300)));
148     i.set_missed_tick_behavior(MissedTickBehavior::Skip);
149 
150     check_interval_poll!(i, start, 0);
151 
152     time::advance(ms(100)).await;
153     check_interval_poll!(i, start);
154 
155     time::advance(ms(200)).await;
156     check_interval_poll!(i, start, 300);
157 
158     time::advance(ms(650)).await;
159     check_interval_poll!(i, start, 600);
160 
161     time::advance(ms(250)).await;
162     check_interval_poll!(i, start, 1200);
163 
164     time::advance(ms(300)).await;
165     check_interval_poll!(i, start, 1500);
166 
167     time::advance(ms(300)).await;
168     check_interval_poll!(i, start, 1800);
169 }
170 
171 #[tokio::test(start_paused = true)]
reset()172 async fn reset() {
173     let start = Instant::now();
174 
175     // This is necessary because the timer is only so granular, and in order for
176     // all our ticks to resolve, the time needs to be 1ms ahead of what we
177     // expect, so that the runtime will see that it is time to resolve the timer
178     time::advance(ms(1)).await;
179 
180     let mut i = task::spawn(time::interval_at(start, ms(300)));
181 
182     check_interval_poll!(i, start, 0);
183 
184     time::advance(ms(100)).await;
185     check_interval_poll!(i, start);
186 
187     time::advance(ms(200)).await;
188     check_interval_poll!(i, start, 300);
189 
190     time::advance(ms(100)).await;
191     check_interval_poll!(i, start);
192 
193     i.reset();
194 
195     time::advance(ms(250)).await;
196     check_interval_poll!(i, start);
197 
198     time::advance(ms(50)).await;
199     // We add one because when using `reset` method, `Interval` adds the
200     // `period` from `Instant::now()`, which will always be off by one
201     check_interval_poll!(i, start, 701);
202 
203     time::advance(ms(300)).await;
204     check_interval_poll!(i, start, 1001);
205 }
206 
207 #[tokio::test(start_paused = true)]
reset_immediately()208 async fn reset_immediately() {
209     let start = Instant::now();
210 
211     // This is necessary because the timer is only so granular, and in order for
212     // all our ticks to resolve, the time needs to be 1ms ahead of what we
213     // expect, so that the runtime will see that it is time to resolve the timer
214     time::advance(ms(1)).await;
215 
216     let mut i = task::spawn(time::interval_at(start, ms(300)));
217 
218     check_interval_poll!(i, start, 0);
219 
220     time::advance(ms(100)).await;
221     check_interval_poll!(i, start);
222 
223     time::advance(ms(200)).await;
224     check_interval_poll!(i, start, 300);
225 
226     time::advance(ms(100)).await;
227     check_interval_poll!(i, start);
228 
229     i.reset_immediately();
230 
231     // We add one because when using `reset` method, `Interval` adds the
232     // `period` from `Instant::now()`, which will always be off by one
233     check_interval_poll!(i, start, 401);
234 
235     time::advance(ms(100)).await;
236     check_interval_poll!(i, start);
237 
238     time::advance(ms(200)).await;
239     check_interval_poll!(i, start, 701);
240 }
241 
242 #[tokio::test(start_paused = true)]
reset_after()243 async fn reset_after() {
244     let start = Instant::now();
245 
246     // This is necessary because the timer is only so granular, and in order for
247     // all our ticks to resolve, the time needs to be 1ms ahead of what we
248     // expect, so that the runtime will see that it is time to resolve the timer
249     time::advance(ms(1)).await;
250 
251     let mut i = task::spawn(time::interval_at(start, ms(300)));
252 
253     check_interval_poll!(i, start, 0);
254 
255     time::advance(ms(100)).await;
256     check_interval_poll!(i, start);
257 
258     time::advance(ms(200)).await;
259     check_interval_poll!(i, start, 300);
260 
261     time::advance(ms(100)).await;
262     check_interval_poll!(i, start);
263 
264     i.reset_after(Duration::from_millis(20));
265 
266     // We add one because when using `reset` method, `Interval` adds the
267     // `period` from `Instant::now()`, which will always be off by one
268     time::advance(ms(20)).await;
269     check_interval_poll!(i, start, 421);
270 
271     time::advance(ms(100)).await;
272     check_interval_poll!(i, start);
273 
274     time::advance(ms(200)).await;
275     check_interval_poll!(i, start, 721);
276 }
277 
278 #[tokio::test(start_paused = true)]
reset_at()279 async fn reset_at() {
280     let start = Instant::now();
281 
282     // This is necessary because the timer is only so granular, and in order for
283     // all our ticks to resolve, the time needs to be 1ms ahead of what we
284     // expect, so that the runtime will see that it is time to resolve the timer
285     time::advance(ms(1)).await;
286 
287     let mut i = task::spawn(time::interval_at(start, ms(300)));
288 
289     check_interval_poll!(i, start, 0);
290 
291     time::advance(ms(100)).await;
292     check_interval_poll!(i, start);
293 
294     time::advance(ms(200)).await;
295     check_interval_poll!(i, start, 300);
296 
297     time::advance(ms(100)).await;
298     check_interval_poll!(i, start);
299 
300     i.reset_at(Instant::now() + Duration::from_millis(40));
301 
302     // We add one because when using `reset` method, `Interval` adds the
303     // `period` from `Instant::now()`, which will always be off by one
304     time::advance(ms(40)).await;
305     check_interval_poll!(i, start, 441);
306 
307     time::advance(ms(100)).await;
308     check_interval_poll!(i, start);
309 
310     time::advance(ms(200)).await;
311     check_interval_poll!(i, start, 741);
312 }
313 
314 #[tokio::test(start_paused = true)]
reset_at_bigger_than_interval()315 async fn reset_at_bigger_than_interval() {
316     let start = Instant::now();
317 
318     // This is necessary because the timer is only so granular, and in order for
319     // all our ticks to resolve, the time needs to be 1ms ahead of what we
320     // expect, so that the runtime will see that it is time to resolve the timer
321     time::advance(ms(1)).await;
322 
323     let mut i = task::spawn(time::interval_at(start, ms(300)));
324 
325     check_interval_poll!(i, start, 0);
326 
327     time::advance(ms(100)).await;
328     check_interval_poll!(i, start);
329 
330     time::advance(ms(200)).await;
331     check_interval_poll!(i, start, 300);
332 
333     time::advance(ms(100)).await;
334     check_interval_poll!(i, start);
335 
336     i.reset_at(Instant::now() + Duration::from_millis(1000));
337 
338     // Validate the interval does not tick until 1000ms have passed
339     time::advance(ms(300)).await;
340     check_interval_poll!(i, start);
341     time::advance(ms(300)).await;
342     check_interval_poll!(i, start);
343     time::advance(ms(300)).await;
344     check_interval_poll!(i, start);
345 
346     // We add one because when using `reset` method, `Interval` adds the
347     // `period` from `Instant::now()`, which will always be off by one
348     time::advance(ms(100)).await;
349     check_interval_poll!(i, start, 1401);
350 
351     time::advance(ms(300)).await;
352     check_interval_poll!(i, start, 1701);
353 }
354 
poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant>355 fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
356     interval.enter(|cx, mut interval| interval.poll_tick(cx))
357 }
358 
ms(n: u64) -> Duration359 fn ms(n: u64) -> Duration {
360     Duration::from_millis(n)
361 }
362 
363 /// Helper struct to test the [tokio::time::Interval::poll_tick()] method.
364 ///
365 /// `poll_tick()` should register the waker in the context only if it returns
366 /// `Poll::Pending`, not when returning `Poll::Ready`. This struct contains an
367 /// interval timer and counts up on every tick when used as stream. When the
368 /// counter is a multiple of four, it yields the current counter value.
369 /// Depending on the value for `wake_on_pending`, it will reschedule itself when
370 /// it returns `Poll::Pending` or not. When used with `wake_on_pending=false`,
371 /// we expect that the stream stalls because the timer will **not** reschedule
372 /// the next wake-up itself once it returned `Poll::Ready`.
373 struct IntervalStreamer {
374     counter: u32,
375     timer: Interval,
376     wake_on_pending: bool,
377 }
378 
379 impl Stream for IntervalStreamer {
380     type Item = u32;
381 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>382     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
383         let this = Pin::into_inner(self);
384 
385         if this.counter > 12 {
386             return Poll::Ready(None);
387         }
388 
389         match this.timer.poll_tick(cx) {
390             Poll::Pending => Poll::Pending,
391             Poll::Ready(_) => {
392                 this.counter += 1;
393                 if this.counter % 4 == 0 {
394                     Poll::Ready(Some(this.counter))
395                 } else {
396                     if this.wake_on_pending {
397                         // Schedule this task for wake-up
398                         cx.waker().wake_by_ref();
399                     }
400                     Poll::Pending
401                 }
402             }
403         }
404     }
405 }
406 
407 #[tokio::test(start_paused = true)]
stream_with_interval_poll_tick_self_waking()408 async fn stream_with_interval_poll_tick_self_waking() {
409     let stream = IntervalStreamer {
410         counter: 0,
411         timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
412         wake_on_pending: true,
413     };
414 
415     let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);
416 
417     // Wrap task in timeout so that it will finish eventually even if the stream
418     // stalls.
419     tokio::spawn(tokio::time::timeout(
420         tokio::time::Duration::from_millis(150),
421         async move {
422             tokio::pin!(stream);
423 
424             while let Some(item) = stream.next().await {
425                 res_tx.send(item).await.ok();
426             }
427         },
428     ));
429 
430     let mut items = Vec::with_capacity(3);
431     while let Some(result) = res_rx.recv().await {
432         items.push(result);
433     }
434 
435     // We expect the stream to yield normally and thus three items.
436     assert_eq!(items, vec![4, 8, 12]);
437 }
438 
439 #[tokio::test(start_paused = true)]
stream_with_interval_poll_tick_no_waking()440 async fn stream_with_interval_poll_tick_no_waking() {
441     let stream = IntervalStreamer {
442         counter: 0,
443         timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
444         wake_on_pending: false,
445     };
446 
447     let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);
448 
449     // Wrap task in timeout so that it will finish eventually even if the stream
450     // stalls.
451     tokio::spawn(tokio::time::timeout(
452         tokio::time::Duration::from_millis(150),
453         async move {
454             tokio::pin!(stream);
455 
456             while let Some(item) = stream.next().await {
457                 res_tx.send(item).await.ok();
458             }
459         },
460     ));
461 
462     let mut items = Vec::with_capacity(0);
463     while let Some(result) = res_rx.recv().await {
464         items.push(result);
465     }
466 
467     // We expect the stream to stall because it does not reschedule itself on
468     // `Poll::Pending` and neither does [tokio::time::Interval] reschedule the
469     // task when returning `Poll::Ready`.
470     assert_eq!(items, vec![]);
471 }
472 
473 #[tokio::test(start_paused = true)]
interval_doesnt_panic_max_duration_when_polling()474 async fn interval_doesnt_panic_max_duration_when_polling() {
475     let mut timer = task::spawn(time::interval(Duration::MAX));
476     assert_ready!(timer.enter(|cx, mut timer| timer.poll_tick(cx)));
477 }
478