1 #![warn(rust_2018_idioms)]
2 
3 use tokio::pin;
4 use tokio::sync::oneshot;
5 use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
6 
7 use core::future::Future;
8 use core::task::{Context, Poll};
9 use futures_test::task::new_count_waker;
10 
11 #[test]
cancel_token()12 fn cancel_token() {
13     let (waker, wake_counter) = new_count_waker();
14     let token = CancellationToken::new();
15     assert!(!token.is_cancelled());
16 
17     let wait_fut = token.cancelled();
18     pin!(wait_fut);
19 
20     assert_eq!(
21         Poll::Pending,
22         wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
23     );
24     assert_eq!(wake_counter, 0);
25 
26     let wait_fut_2 = token.cancelled();
27     pin!(wait_fut_2);
28 
29     token.cancel();
30     assert_eq!(wake_counter, 1);
31     assert!(token.is_cancelled());
32 
33     assert_eq!(
34         Poll::Ready(()),
35         wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
36     );
37     assert_eq!(
38         Poll::Ready(()),
39         wait_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
40     );
41 }
42 
43 #[test]
cancel_token_owned()44 fn cancel_token_owned() {
45     let (waker, wake_counter) = new_count_waker();
46     let token = CancellationToken::new();
47     assert!(!token.is_cancelled());
48 
49     let wait_fut = token.clone().cancelled_owned();
50     pin!(wait_fut);
51 
52     assert_eq!(
53         Poll::Pending,
54         wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
55     );
56     assert_eq!(wake_counter, 0);
57 
58     let wait_fut_2 = token.clone().cancelled_owned();
59     pin!(wait_fut_2);
60 
61     token.cancel();
62     assert_eq!(wake_counter, 1);
63     assert!(token.is_cancelled());
64 
65     assert_eq!(
66         Poll::Ready(()),
67         wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
68     );
69     assert_eq!(
70         Poll::Ready(()),
71         wait_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
72     );
73 }
74 
75 #[test]
cancel_token_owned_drop_test()76 fn cancel_token_owned_drop_test() {
77     let (waker, wake_counter) = new_count_waker();
78     let token = CancellationToken::new();
79 
80     let future = token.cancelled_owned();
81     pin!(future);
82 
83     assert_eq!(
84         Poll::Pending,
85         future.as_mut().poll(&mut Context::from_waker(&waker))
86     );
87     assert_eq!(wake_counter, 0);
88 
89     // let future be dropped while pinned and under pending state to
90     // find potential memory related bugs.
91 }
92 
93 #[test]
cancel_child_token_through_parent()94 fn cancel_child_token_through_parent() {
95     let (waker, wake_counter) = new_count_waker();
96     let token = CancellationToken::new();
97 
98     let child_token = token.child_token();
99     assert!(!child_token.is_cancelled());
100 
101     let child_fut = child_token.cancelled();
102     pin!(child_fut);
103     let parent_fut = token.cancelled();
104     pin!(parent_fut);
105 
106     assert_eq!(
107         Poll::Pending,
108         child_fut.as_mut().poll(&mut Context::from_waker(&waker))
109     );
110     assert_eq!(
111         Poll::Pending,
112         parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
113     );
114     assert_eq!(wake_counter, 0);
115 
116     token.cancel();
117     assert_eq!(wake_counter, 2);
118     assert!(token.is_cancelled());
119     assert!(child_token.is_cancelled());
120 
121     assert_eq!(
122         Poll::Ready(()),
123         child_fut.as_mut().poll(&mut Context::from_waker(&waker))
124     );
125     assert_eq!(
126         Poll::Ready(()),
127         parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
128     );
129 }
130 
131 #[test]
cancel_grandchild_token_through_parent_if_child_was_dropped()132 fn cancel_grandchild_token_through_parent_if_child_was_dropped() {
133     let (waker, wake_counter) = new_count_waker();
134     let token = CancellationToken::new();
135 
136     let intermediate_token = token.child_token();
137     let child_token = intermediate_token.child_token();
138     drop(intermediate_token);
139     assert!(!child_token.is_cancelled());
140 
141     let child_fut = child_token.cancelled();
142     pin!(child_fut);
143     let parent_fut = token.cancelled();
144     pin!(parent_fut);
145 
146     assert_eq!(
147         Poll::Pending,
148         child_fut.as_mut().poll(&mut Context::from_waker(&waker))
149     );
150     assert_eq!(
151         Poll::Pending,
152         parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
153     );
154     assert_eq!(wake_counter, 0);
155 
156     token.cancel();
157     assert_eq!(wake_counter, 2);
158     assert!(token.is_cancelled());
159     assert!(child_token.is_cancelled());
160 
161     assert_eq!(
162         Poll::Ready(()),
163         child_fut.as_mut().poll(&mut Context::from_waker(&waker))
164     );
165     assert_eq!(
166         Poll::Ready(()),
167         parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
168     );
169 }
170 
171 #[test]
cancel_child_token_without_parent()172 fn cancel_child_token_without_parent() {
173     let (waker, wake_counter) = new_count_waker();
174     let token = CancellationToken::new();
175 
176     let child_token_1 = token.child_token();
177 
178     let child_fut = child_token_1.cancelled();
179     pin!(child_fut);
180     let parent_fut = token.cancelled();
181     pin!(parent_fut);
182 
183     assert_eq!(
184         Poll::Pending,
185         child_fut.as_mut().poll(&mut Context::from_waker(&waker))
186     );
187     assert_eq!(
188         Poll::Pending,
189         parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
190     );
191     assert_eq!(wake_counter, 0);
192 
193     child_token_1.cancel();
194     assert_eq!(wake_counter, 1);
195     assert!(!token.is_cancelled());
196     assert!(child_token_1.is_cancelled());
197 
198     assert_eq!(
199         Poll::Ready(()),
200         child_fut.as_mut().poll(&mut Context::from_waker(&waker))
201     );
202     assert_eq!(
203         Poll::Pending,
204         parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
205     );
206 
207     let child_token_2 = token.child_token();
208     let child_fut_2 = child_token_2.cancelled();
209     pin!(child_fut_2);
210 
211     assert_eq!(
212         Poll::Pending,
213         child_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
214     );
215     assert_eq!(
216         Poll::Pending,
217         parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
218     );
219 
220     token.cancel();
221     assert_eq!(wake_counter, 3);
222     assert!(token.is_cancelled());
223     assert!(child_token_2.is_cancelled());
224 
225     assert_eq!(
226         Poll::Ready(()),
227         child_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
228     );
229     assert_eq!(
230         Poll::Ready(()),
231         parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
232     );
233 }
234 
235 #[test]
create_child_token_after_parent_was_cancelled()236 fn create_child_token_after_parent_was_cancelled() {
237     for drop_child_first in [true, false].iter().cloned() {
238         let (waker, wake_counter) = new_count_waker();
239         let token = CancellationToken::new();
240         token.cancel();
241 
242         let child_token = token.child_token();
243         assert!(child_token.is_cancelled());
244 
245         {
246             let child_fut = child_token.cancelled();
247             pin!(child_fut);
248             let parent_fut = token.cancelled();
249             pin!(parent_fut);
250 
251             assert_eq!(
252                 Poll::Ready(()),
253                 child_fut.as_mut().poll(&mut Context::from_waker(&waker))
254             );
255             assert_eq!(
256                 Poll::Ready(()),
257                 parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
258             );
259             assert_eq!(wake_counter, 0);
260         }
261 
262         if drop_child_first {
263             drop(child_token);
264             drop(token);
265         } else {
266             drop(token);
267             drop(child_token);
268         }
269     }
270 }
271 
272 #[test]
drop_multiple_child_tokens()273 fn drop_multiple_child_tokens() {
274     for drop_first_child_first in &[true, false] {
275         let token = CancellationToken::new();
276         let mut child_tokens = [None, None, None];
277         for child in &mut child_tokens {
278             *child = Some(token.child_token());
279         }
280 
281         assert!(!token.is_cancelled());
282         assert!(!child_tokens[0].as_ref().unwrap().is_cancelled());
283 
284         for i in 0..child_tokens.len() {
285             if *drop_first_child_first {
286                 child_tokens[i] = None;
287             } else {
288                 child_tokens[child_tokens.len() - 1 - i] = None;
289             }
290             assert!(!token.is_cancelled());
291         }
292 
293         drop(token);
294     }
295 }
296 
297 #[test]
cancel_only_all_descendants()298 fn cancel_only_all_descendants() {
299     // ARRANGE
300     let (waker, wake_counter) = new_count_waker();
301 
302     let parent_token = CancellationToken::new();
303     let token = parent_token.child_token();
304     let sibling_token = parent_token.child_token();
305     let child1_token = token.child_token();
306     let child2_token = token.child_token();
307     let grandchild_token = child1_token.child_token();
308     let grandchild2_token = child1_token.child_token();
309     let great_grandchild_token = grandchild_token.child_token();
310 
311     assert!(!parent_token.is_cancelled());
312     assert!(!token.is_cancelled());
313     assert!(!sibling_token.is_cancelled());
314     assert!(!child1_token.is_cancelled());
315     assert!(!child2_token.is_cancelled());
316     assert!(!grandchild_token.is_cancelled());
317     assert!(!grandchild2_token.is_cancelled());
318     assert!(!great_grandchild_token.is_cancelled());
319 
320     let parent_fut = parent_token.cancelled();
321     let fut = token.cancelled();
322     let sibling_fut = sibling_token.cancelled();
323     let child1_fut = child1_token.cancelled();
324     let child2_fut = child2_token.cancelled();
325     let grandchild_fut = grandchild_token.cancelled();
326     let grandchild2_fut = grandchild2_token.cancelled();
327     let great_grandchild_fut = great_grandchild_token.cancelled();
328 
329     pin!(parent_fut);
330     pin!(fut);
331     pin!(sibling_fut);
332     pin!(child1_fut);
333     pin!(child2_fut);
334     pin!(grandchild_fut);
335     pin!(grandchild2_fut);
336     pin!(great_grandchild_fut);
337 
338     assert_eq!(
339         Poll::Pending,
340         parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
341     );
342     assert_eq!(
343         Poll::Pending,
344         fut.as_mut().poll(&mut Context::from_waker(&waker))
345     );
346     assert_eq!(
347         Poll::Pending,
348         sibling_fut.as_mut().poll(&mut Context::from_waker(&waker))
349     );
350     assert_eq!(
351         Poll::Pending,
352         child1_fut.as_mut().poll(&mut Context::from_waker(&waker))
353     );
354     assert_eq!(
355         Poll::Pending,
356         child2_fut.as_mut().poll(&mut Context::from_waker(&waker))
357     );
358     assert_eq!(
359         Poll::Pending,
360         grandchild_fut
361             .as_mut()
362             .poll(&mut Context::from_waker(&waker))
363     );
364     assert_eq!(
365         Poll::Pending,
366         grandchild2_fut
367             .as_mut()
368             .poll(&mut Context::from_waker(&waker))
369     );
370     assert_eq!(
371         Poll::Pending,
372         great_grandchild_fut
373             .as_mut()
374             .poll(&mut Context::from_waker(&waker))
375     );
376     assert_eq!(wake_counter, 0);
377 
378     // ACT
379     token.cancel();
380 
381     // ASSERT
382     assert_eq!(wake_counter, 6);
383     assert!(!parent_token.is_cancelled());
384     assert!(token.is_cancelled());
385     assert!(!sibling_token.is_cancelled());
386     assert!(child1_token.is_cancelled());
387     assert!(child2_token.is_cancelled());
388     assert!(grandchild_token.is_cancelled());
389     assert!(grandchild2_token.is_cancelled());
390     assert!(great_grandchild_token.is_cancelled());
391 
392     assert_eq!(
393         Poll::Ready(()),
394         fut.as_mut().poll(&mut Context::from_waker(&waker))
395     );
396     assert_eq!(
397         Poll::Ready(()),
398         child1_fut.as_mut().poll(&mut Context::from_waker(&waker))
399     );
400     assert_eq!(
401         Poll::Ready(()),
402         child2_fut.as_mut().poll(&mut Context::from_waker(&waker))
403     );
404     assert_eq!(
405         Poll::Ready(()),
406         grandchild_fut
407             .as_mut()
408             .poll(&mut Context::from_waker(&waker))
409     );
410     assert_eq!(
411         Poll::Ready(()),
412         grandchild2_fut
413             .as_mut()
414             .poll(&mut Context::from_waker(&waker))
415     );
416     assert_eq!(
417         Poll::Ready(()),
418         great_grandchild_fut
419             .as_mut()
420             .poll(&mut Context::from_waker(&waker))
421     );
422     assert_eq!(wake_counter, 6);
423 }
424 
425 #[test]
drop_parent_before_child_tokens()426 fn drop_parent_before_child_tokens() {
427     let token = CancellationToken::new();
428     let child1 = token.child_token();
429     let child2 = token.child_token();
430 
431     drop(token);
432     assert!(!child1.is_cancelled());
433 
434     drop(child1);
435     drop(child2);
436 }
437 
438 #[test]
derives_send_sync()439 fn derives_send_sync() {
440     fn assert_send<T: Send>() {}
441     fn assert_sync<T: Sync>() {}
442 
443     assert_send::<CancellationToken>();
444     assert_sync::<CancellationToken>();
445 
446     assert_send::<WaitForCancellationFuture<'static>>();
447     assert_sync::<WaitForCancellationFuture<'static>>();
448 }
449 
450 #[test]
run_until_cancelled_test()451 fn run_until_cancelled_test() {
452     let (waker, _) = new_count_waker();
453 
454     {
455         let token = CancellationToken::new();
456 
457         let fut = token.run_until_cancelled(std::future::pending::<()>());
458         pin!(fut);
459 
460         assert_eq!(
461             Poll::Pending,
462             fut.as_mut().poll(&mut Context::from_waker(&waker))
463         );
464 
465         token.cancel();
466 
467         assert_eq!(
468             Poll::Ready(None),
469             fut.as_mut().poll(&mut Context::from_waker(&waker))
470         );
471     }
472 
473     {
474         let (tx, rx) = oneshot::channel::<()>();
475 
476         let token = CancellationToken::new();
477         let fut = token.run_until_cancelled(async move {
478             rx.await.unwrap();
479             42
480         });
481         pin!(fut);
482 
483         assert_eq!(
484             Poll::Pending,
485             fut.as_mut().poll(&mut Context::from_waker(&waker))
486         );
487 
488         tx.send(()).unwrap();
489 
490         assert_eq!(
491             Poll::Ready(Some(42)),
492             fut.as_mut().poll(&mut Context::from_waker(&waker))
493         );
494     }
495 }
496