1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6
7 use futures::{Stream, StreamExt};
8 use tokio::time::{self, Duration, Instant, Interval, MissedTickBehavior};
9 use tokio_test::{assert_pending, assert_ready, assert_ready_eq, task};
10
11 // Takes the `Interval` task, `start` variable, and optional time deltas
12 // For each time delta, it polls the `Interval` and asserts that the result is
13 // equal to `start` + the specific time delta. Then it asserts that the
14 // `Interval` is pending.
15 macro_rules! check_interval_poll {
16 ($i:ident, $start:ident, $($delta:expr),*$(,)?) => {
17 $(
18 assert_ready_eq!(poll_next(&mut $i), $start + ms($delta));
19 )*
20 assert_pending!(poll_next(&mut $i));
21 };
22 ($i:ident, $start:ident) => {
23 check_interval_poll!($i, $start,);
24 };
25 }
26
27 #[tokio::test]
28 #[should_panic]
interval_zero_duration()29 async fn interval_zero_duration() {
30 let _ = time::interval_at(Instant::now(), ms(0));
31 }
32
33 // Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
34 // Actual ticks: | work -----| delay | work | work | work -| work -----|
35 // Poll behavior: | | | | | | | |
36 // | | | | | | | |
37 // Ready(s) | | Ready(s + 2p) | | | |
38 // Pending | Ready(s + 3p) | | |
39 // Ready(s + p) Ready(s + 4p) | |
40 // Ready(s + 5p) |
41 // Ready(s + 6p)
42 #[tokio::test(start_paused = true)]
burst()43 async fn burst() {
44 let start = Instant::now();
45
46 // This is necessary because the timer is only so granular, and in order for
47 // all our ticks to resolve, the time needs to be 1ms ahead of what we
48 // expect, so that the runtime will see that it is time to resolve the timer
49 time::advance(ms(1)).await;
50
51 let mut i = task::spawn(time::interval_at(start, ms(300)));
52
53 check_interval_poll!(i, start, 0);
54
55 time::advance(ms(100)).await;
56 check_interval_poll!(i, start);
57
58 time::advance(ms(200)).await;
59 check_interval_poll!(i, start, 300);
60
61 time::advance(ms(650)).await;
62 check_interval_poll!(i, start, 600, 900);
63
64 time::advance(ms(200)).await;
65 check_interval_poll!(i, start);
66
67 time::advance(ms(100)).await;
68 check_interval_poll!(i, start, 1200);
69
70 time::advance(ms(250)).await;
71 check_interval_poll!(i, start, 1500);
72
73 time::advance(ms(300)).await;
74 check_interval_poll!(i, start, 1800);
75 }
76
77 // Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
78 // Actual ticks: | work -----| delay | work -----| work -----| work -----|
79 // Poll behavior: | | | | | | | |
80 // | | | | | | | |
81 // Ready(s) | | Ready(s + 2p) | | | |
82 // Pending | Pending | | |
83 // Ready(s + p) Ready(s + 2p + d) | |
84 // Ready(s + 3p + d) |
85 // Ready(s + 4p + d)
86 #[tokio::test(start_paused = true)]
delay()87 async fn delay() {
88 let start = Instant::now();
89
90 // This is necessary because the timer is only so granular, and in order for
91 // all our ticks to resolve, the time needs to be 1ms ahead of what we
92 // expect, so that the runtime will see that it is time to resolve the timer
93 time::advance(ms(1)).await;
94
95 let mut i = task::spawn(time::interval_at(start, ms(300)));
96 i.set_missed_tick_behavior(MissedTickBehavior::Delay);
97
98 check_interval_poll!(i, start, 0);
99
100 time::advance(ms(100)).await;
101 check_interval_poll!(i, start);
102
103 time::advance(ms(200)).await;
104 check_interval_poll!(i, start, 300);
105
106 time::advance(ms(650)).await;
107 check_interval_poll!(i, start, 600);
108
109 time::advance(ms(100)).await;
110 check_interval_poll!(i, start);
111
112 // We have to add one here for the same reason as is above.
113 // Because `Interval` has reset its timer according to `Instant::now()`,
114 // we have to go forward 1 more millisecond than is expected so that the
115 // runtime realizes that it's time to resolve the timer.
116 time::advance(ms(201)).await;
117 // We add one because when using the `Delay` behavior, `Interval`
118 // adds the `period` from `Instant::now()`, which will always be off by one
119 // because we have to advance time by 1 (see above).
120 check_interval_poll!(i, start, 1251);
121
122 time::advance(ms(300)).await;
123 // Again, we add one.
124 check_interval_poll!(i, start, 1551);
125
126 time::advance(ms(300)).await;
127 check_interval_poll!(i, start, 1851);
128 }
129
130 // Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
131 // Actual ticks: | work -----| delay | work ---| work -----| work -----|
132 // Poll behavior: | | | | | | |
133 // | | | | | | |
134 // Ready(s) | | Ready(s + 2p) | | |
135 // Pending | Ready(s + 4p) | |
136 // Ready(s + p) Ready(s + 5p) |
137 // Ready(s + 6p)
138 #[tokio::test(start_paused = true)]
skip()139 async fn skip() {
140 let start = Instant::now();
141
142 // This is necessary because the timer is only so granular, and in order for
143 // all our ticks to resolve, the time needs to be 1ms ahead of what we
144 // expect, so that the runtime will see that it is time to resolve the timer
145 time::advance(ms(1)).await;
146
147 let mut i = task::spawn(time::interval_at(start, ms(300)));
148 i.set_missed_tick_behavior(MissedTickBehavior::Skip);
149
150 check_interval_poll!(i, start, 0);
151
152 time::advance(ms(100)).await;
153 check_interval_poll!(i, start);
154
155 time::advance(ms(200)).await;
156 check_interval_poll!(i, start, 300);
157
158 time::advance(ms(650)).await;
159 check_interval_poll!(i, start, 600);
160
161 time::advance(ms(250)).await;
162 check_interval_poll!(i, start, 1200);
163
164 time::advance(ms(300)).await;
165 check_interval_poll!(i, start, 1500);
166
167 time::advance(ms(300)).await;
168 check_interval_poll!(i, start, 1800);
169 }
170
171 #[tokio::test(start_paused = true)]
reset()172 async fn reset() {
173 let start = Instant::now();
174
175 // This is necessary because the timer is only so granular, and in order for
176 // all our ticks to resolve, the time needs to be 1ms ahead of what we
177 // expect, so that the runtime will see that it is time to resolve the timer
178 time::advance(ms(1)).await;
179
180 let mut i = task::spawn(time::interval_at(start, ms(300)));
181
182 check_interval_poll!(i, start, 0);
183
184 time::advance(ms(100)).await;
185 check_interval_poll!(i, start);
186
187 time::advance(ms(200)).await;
188 check_interval_poll!(i, start, 300);
189
190 time::advance(ms(100)).await;
191 check_interval_poll!(i, start);
192
193 i.reset();
194
195 time::advance(ms(250)).await;
196 check_interval_poll!(i, start);
197
198 time::advance(ms(50)).await;
199 // We add one because when using `reset` method, `Interval` adds the
200 // `period` from `Instant::now()`, which will always be off by one
201 check_interval_poll!(i, start, 701);
202
203 time::advance(ms(300)).await;
204 check_interval_poll!(i, start, 1001);
205 }
206
207 #[tokio::test(start_paused = true)]
reset_immediately()208 async fn reset_immediately() {
209 let start = Instant::now();
210
211 // This is necessary because the timer is only so granular, and in order for
212 // all our ticks to resolve, the time needs to be 1ms ahead of what we
213 // expect, so that the runtime will see that it is time to resolve the timer
214 time::advance(ms(1)).await;
215
216 let mut i = task::spawn(time::interval_at(start, ms(300)));
217
218 check_interval_poll!(i, start, 0);
219
220 time::advance(ms(100)).await;
221 check_interval_poll!(i, start);
222
223 time::advance(ms(200)).await;
224 check_interval_poll!(i, start, 300);
225
226 time::advance(ms(100)).await;
227 check_interval_poll!(i, start);
228
229 i.reset_immediately();
230
231 // We add one because when using `reset` method, `Interval` adds the
232 // `period` from `Instant::now()`, which will always be off by one
233 check_interval_poll!(i, start, 401);
234
235 time::advance(ms(100)).await;
236 check_interval_poll!(i, start);
237
238 time::advance(ms(200)).await;
239 check_interval_poll!(i, start, 701);
240 }
241
242 #[tokio::test(start_paused = true)]
reset_after()243 async fn reset_after() {
244 let start = Instant::now();
245
246 // This is necessary because the timer is only so granular, and in order for
247 // all our ticks to resolve, the time needs to be 1ms ahead of what we
248 // expect, so that the runtime will see that it is time to resolve the timer
249 time::advance(ms(1)).await;
250
251 let mut i = task::spawn(time::interval_at(start, ms(300)));
252
253 check_interval_poll!(i, start, 0);
254
255 time::advance(ms(100)).await;
256 check_interval_poll!(i, start);
257
258 time::advance(ms(200)).await;
259 check_interval_poll!(i, start, 300);
260
261 time::advance(ms(100)).await;
262 check_interval_poll!(i, start);
263
264 i.reset_after(Duration::from_millis(20));
265
266 // We add one because when using `reset` method, `Interval` adds the
267 // `period` from `Instant::now()`, which will always be off by one
268 time::advance(ms(20)).await;
269 check_interval_poll!(i, start, 421);
270
271 time::advance(ms(100)).await;
272 check_interval_poll!(i, start);
273
274 time::advance(ms(200)).await;
275 check_interval_poll!(i, start, 721);
276 }
277
278 #[tokio::test(start_paused = true)]
reset_at()279 async fn reset_at() {
280 let start = Instant::now();
281
282 // This is necessary because the timer is only so granular, and in order for
283 // all our ticks to resolve, the time needs to be 1ms ahead of what we
284 // expect, so that the runtime will see that it is time to resolve the timer
285 time::advance(ms(1)).await;
286
287 let mut i = task::spawn(time::interval_at(start, ms(300)));
288
289 check_interval_poll!(i, start, 0);
290
291 time::advance(ms(100)).await;
292 check_interval_poll!(i, start);
293
294 time::advance(ms(200)).await;
295 check_interval_poll!(i, start, 300);
296
297 time::advance(ms(100)).await;
298 check_interval_poll!(i, start);
299
300 i.reset_at(Instant::now() + Duration::from_millis(40));
301
302 // We add one because when using `reset` method, `Interval` adds the
303 // `period` from `Instant::now()`, which will always be off by one
304 time::advance(ms(40)).await;
305 check_interval_poll!(i, start, 441);
306
307 time::advance(ms(100)).await;
308 check_interval_poll!(i, start);
309
310 time::advance(ms(200)).await;
311 check_interval_poll!(i, start, 741);
312 }
313
314 #[tokio::test(start_paused = true)]
reset_at_bigger_than_interval()315 async fn reset_at_bigger_than_interval() {
316 let start = Instant::now();
317
318 // This is necessary because the timer is only so granular, and in order for
319 // all our ticks to resolve, the time needs to be 1ms ahead of what we
320 // expect, so that the runtime will see that it is time to resolve the timer
321 time::advance(ms(1)).await;
322
323 let mut i = task::spawn(time::interval_at(start, ms(300)));
324
325 check_interval_poll!(i, start, 0);
326
327 time::advance(ms(100)).await;
328 check_interval_poll!(i, start);
329
330 time::advance(ms(200)).await;
331 check_interval_poll!(i, start, 300);
332
333 time::advance(ms(100)).await;
334 check_interval_poll!(i, start);
335
336 i.reset_at(Instant::now() + Duration::from_millis(1000));
337
338 // Validate the interval does not tick until 1000ms have passed
339 time::advance(ms(300)).await;
340 check_interval_poll!(i, start);
341 time::advance(ms(300)).await;
342 check_interval_poll!(i, start);
343 time::advance(ms(300)).await;
344 check_interval_poll!(i, start);
345
346 // We add one because when using `reset` method, `Interval` adds the
347 // `period` from `Instant::now()`, which will always be off by one
348 time::advance(ms(100)).await;
349 check_interval_poll!(i, start, 1401);
350
351 time::advance(ms(300)).await;
352 check_interval_poll!(i, start, 1701);
353 }
354
poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant>355 fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
356 interval.enter(|cx, mut interval| interval.poll_tick(cx))
357 }
358
ms(n: u64) -> Duration359 fn ms(n: u64) -> Duration {
360 Duration::from_millis(n)
361 }
362
363 /// Helper struct to test the [tokio::time::Interval::poll_tick()] method.
364 ///
365 /// `poll_tick()` should register the waker in the context only if it returns
366 /// `Poll::Pending`, not when returning `Poll::Ready`. This struct contains an
367 /// interval timer and counts up on every tick when used as stream. When the
368 /// counter is a multiple of four, it yields the current counter value.
369 /// Depending on the value for `wake_on_pending`, it will reschedule itself when
370 /// it returns `Poll::Pending` or not. When used with `wake_on_pending=false`,
371 /// we expect that the stream stalls because the timer will **not** reschedule
372 /// the next wake-up itself once it returned `Poll::Ready`.
373 struct IntervalStreamer {
374 counter: u32,
375 timer: Interval,
376 wake_on_pending: bool,
377 }
378
379 impl Stream for IntervalStreamer {
380 type Item = u32;
381
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>382 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
383 let this = Pin::into_inner(self);
384
385 if this.counter > 12 {
386 return Poll::Ready(None);
387 }
388
389 match this.timer.poll_tick(cx) {
390 Poll::Pending => Poll::Pending,
391 Poll::Ready(_) => {
392 this.counter += 1;
393 if this.counter % 4 == 0 {
394 Poll::Ready(Some(this.counter))
395 } else {
396 if this.wake_on_pending {
397 // Schedule this task for wake-up
398 cx.waker().wake_by_ref();
399 }
400 Poll::Pending
401 }
402 }
403 }
404 }
405 }
406
407 #[tokio::test(start_paused = true)]
stream_with_interval_poll_tick_self_waking()408 async fn stream_with_interval_poll_tick_self_waking() {
409 let stream = IntervalStreamer {
410 counter: 0,
411 timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
412 wake_on_pending: true,
413 };
414
415 let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);
416
417 // Wrap task in timeout so that it will finish eventually even if the stream
418 // stalls.
419 tokio::spawn(tokio::time::timeout(
420 tokio::time::Duration::from_millis(150),
421 async move {
422 tokio::pin!(stream);
423
424 while let Some(item) = stream.next().await {
425 res_tx.send(item).await.ok();
426 }
427 },
428 ));
429
430 let mut items = Vec::with_capacity(3);
431 while let Some(result) = res_rx.recv().await {
432 items.push(result);
433 }
434
435 // We expect the stream to yield normally and thus three items.
436 assert_eq!(items, vec![4, 8, 12]);
437 }
438
439 #[tokio::test(start_paused = true)]
stream_with_interval_poll_tick_no_waking()440 async fn stream_with_interval_poll_tick_no_waking() {
441 let stream = IntervalStreamer {
442 counter: 0,
443 timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
444 wake_on_pending: false,
445 };
446
447 let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);
448
449 // Wrap task in timeout so that it will finish eventually even if the stream
450 // stalls.
451 tokio::spawn(tokio::time::timeout(
452 tokio::time::Duration::from_millis(150),
453 async move {
454 tokio::pin!(stream);
455
456 while let Some(item) = stream.next().await {
457 res_tx.send(item).await.ok();
458 }
459 },
460 ));
461
462 let mut items = Vec::with_capacity(0);
463 while let Some(result) = res_rx.recv().await {
464 items.push(result);
465 }
466
467 // We expect the stream to stall because it does not reschedule itself on
468 // `Poll::Pending` and neither does [tokio::time::Interval] reschedule the
469 // task when returning `Poll::Ready`.
470 assert_eq!(items, vec![]);
471 }
472
473 #[tokio::test(start_paused = true)]
interval_doesnt_panic_max_duration_when_polling()474 async fn interval_doesnt_panic_max_duration_when_polling() {
475 let mut timer = task::spawn(time::interval(Duration::MAX));
476 assert_ready!(timer.enter(|cx, mut timer| timer.poll_tick(cx)));
477 }
478