1 #![warn(rust_2018_idioms)]
2
3 use tokio::pin;
4 use tokio::sync::oneshot;
5 use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
6
7 use core::future::Future;
8 use core::task::{Context, Poll};
9 use futures_test::task::new_count_waker;
10
11 #[test]
cancel_token()12 fn cancel_token() {
13 let (waker, wake_counter) = new_count_waker();
14 let token = CancellationToken::new();
15 assert!(!token.is_cancelled());
16
17 let wait_fut = token.cancelled();
18 pin!(wait_fut);
19
20 assert_eq!(
21 Poll::Pending,
22 wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
23 );
24 assert_eq!(wake_counter, 0);
25
26 let wait_fut_2 = token.cancelled();
27 pin!(wait_fut_2);
28
29 token.cancel();
30 assert_eq!(wake_counter, 1);
31 assert!(token.is_cancelled());
32
33 assert_eq!(
34 Poll::Ready(()),
35 wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
36 );
37 assert_eq!(
38 Poll::Ready(()),
39 wait_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
40 );
41 }
42
43 #[test]
cancel_token_owned()44 fn cancel_token_owned() {
45 let (waker, wake_counter) = new_count_waker();
46 let token = CancellationToken::new();
47 assert!(!token.is_cancelled());
48
49 let wait_fut = token.clone().cancelled_owned();
50 pin!(wait_fut);
51
52 assert_eq!(
53 Poll::Pending,
54 wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
55 );
56 assert_eq!(wake_counter, 0);
57
58 let wait_fut_2 = token.clone().cancelled_owned();
59 pin!(wait_fut_2);
60
61 token.cancel();
62 assert_eq!(wake_counter, 1);
63 assert!(token.is_cancelled());
64
65 assert_eq!(
66 Poll::Ready(()),
67 wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
68 );
69 assert_eq!(
70 Poll::Ready(()),
71 wait_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
72 );
73 }
74
75 #[test]
cancel_token_owned_drop_test()76 fn cancel_token_owned_drop_test() {
77 let (waker, wake_counter) = new_count_waker();
78 let token = CancellationToken::new();
79
80 let future = token.cancelled_owned();
81 pin!(future);
82
83 assert_eq!(
84 Poll::Pending,
85 future.as_mut().poll(&mut Context::from_waker(&waker))
86 );
87 assert_eq!(wake_counter, 0);
88
89 // let future be dropped while pinned and under pending state to
90 // find potential memory related bugs.
91 }
92
93 #[test]
cancel_child_token_through_parent()94 fn cancel_child_token_through_parent() {
95 let (waker, wake_counter) = new_count_waker();
96 let token = CancellationToken::new();
97
98 let child_token = token.child_token();
99 assert!(!child_token.is_cancelled());
100
101 let child_fut = child_token.cancelled();
102 pin!(child_fut);
103 let parent_fut = token.cancelled();
104 pin!(parent_fut);
105
106 assert_eq!(
107 Poll::Pending,
108 child_fut.as_mut().poll(&mut Context::from_waker(&waker))
109 );
110 assert_eq!(
111 Poll::Pending,
112 parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
113 );
114 assert_eq!(wake_counter, 0);
115
116 token.cancel();
117 assert_eq!(wake_counter, 2);
118 assert!(token.is_cancelled());
119 assert!(child_token.is_cancelled());
120
121 assert_eq!(
122 Poll::Ready(()),
123 child_fut.as_mut().poll(&mut Context::from_waker(&waker))
124 );
125 assert_eq!(
126 Poll::Ready(()),
127 parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
128 );
129 }
130
131 #[test]
cancel_grandchild_token_through_parent_if_child_was_dropped()132 fn cancel_grandchild_token_through_parent_if_child_was_dropped() {
133 let (waker, wake_counter) = new_count_waker();
134 let token = CancellationToken::new();
135
136 let intermediate_token = token.child_token();
137 let child_token = intermediate_token.child_token();
138 drop(intermediate_token);
139 assert!(!child_token.is_cancelled());
140
141 let child_fut = child_token.cancelled();
142 pin!(child_fut);
143 let parent_fut = token.cancelled();
144 pin!(parent_fut);
145
146 assert_eq!(
147 Poll::Pending,
148 child_fut.as_mut().poll(&mut Context::from_waker(&waker))
149 );
150 assert_eq!(
151 Poll::Pending,
152 parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
153 );
154 assert_eq!(wake_counter, 0);
155
156 token.cancel();
157 assert_eq!(wake_counter, 2);
158 assert!(token.is_cancelled());
159 assert!(child_token.is_cancelled());
160
161 assert_eq!(
162 Poll::Ready(()),
163 child_fut.as_mut().poll(&mut Context::from_waker(&waker))
164 );
165 assert_eq!(
166 Poll::Ready(()),
167 parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
168 );
169 }
170
171 #[test]
cancel_child_token_without_parent()172 fn cancel_child_token_without_parent() {
173 let (waker, wake_counter) = new_count_waker();
174 let token = CancellationToken::new();
175
176 let child_token_1 = token.child_token();
177
178 let child_fut = child_token_1.cancelled();
179 pin!(child_fut);
180 let parent_fut = token.cancelled();
181 pin!(parent_fut);
182
183 assert_eq!(
184 Poll::Pending,
185 child_fut.as_mut().poll(&mut Context::from_waker(&waker))
186 );
187 assert_eq!(
188 Poll::Pending,
189 parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
190 );
191 assert_eq!(wake_counter, 0);
192
193 child_token_1.cancel();
194 assert_eq!(wake_counter, 1);
195 assert!(!token.is_cancelled());
196 assert!(child_token_1.is_cancelled());
197
198 assert_eq!(
199 Poll::Ready(()),
200 child_fut.as_mut().poll(&mut Context::from_waker(&waker))
201 );
202 assert_eq!(
203 Poll::Pending,
204 parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
205 );
206
207 let child_token_2 = token.child_token();
208 let child_fut_2 = child_token_2.cancelled();
209 pin!(child_fut_2);
210
211 assert_eq!(
212 Poll::Pending,
213 child_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
214 );
215 assert_eq!(
216 Poll::Pending,
217 parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
218 );
219
220 token.cancel();
221 assert_eq!(wake_counter, 3);
222 assert!(token.is_cancelled());
223 assert!(child_token_2.is_cancelled());
224
225 assert_eq!(
226 Poll::Ready(()),
227 child_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
228 );
229 assert_eq!(
230 Poll::Ready(()),
231 parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
232 );
233 }
234
235 #[test]
create_child_token_after_parent_was_cancelled()236 fn create_child_token_after_parent_was_cancelled() {
237 for drop_child_first in [true, false].iter().cloned() {
238 let (waker, wake_counter) = new_count_waker();
239 let token = CancellationToken::new();
240 token.cancel();
241
242 let child_token = token.child_token();
243 assert!(child_token.is_cancelled());
244
245 {
246 let child_fut = child_token.cancelled();
247 pin!(child_fut);
248 let parent_fut = token.cancelled();
249 pin!(parent_fut);
250
251 assert_eq!(
252 Poll::Ready(()),
253 child_fut.as_mut().poll(&mut Context::from_waker(&waker))
254 );
255 assert_eq!(
256 Poll::Ready(()),
257 parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
258 );
259 assert_eq!(wake_counter, 0);
260 }
261
262 if drop_child_first {
263 drop(child_token);
264 drop(token);
265 } else {
266 drop(token);
267 drop(child_token);
268 }
269 }
270 }
271
272 #[test]
drop_multiple_child_tokens()273 fn drop_multiple_child_tokens() {
274 for drop_first_child_first in &[true, false] {
275 let token = CancellationToken::new();
276 let mut child_tokens = [None, None, None];
277 for child in &mut child_tokens {
278 *child = Some(token.child_token());
279 }
280
281 assert!(!token.is_cancelled());
282 assert!(!child_tokens[0].as_ref().unwrap().is_cancelled());
283
284 for i in 0..child_tokens.len() {
285 if *drop_first_child_first {
286 child_tokens[i] = None;
287 } else {
288 child_tokens[child_tokens.len() - 1 - i] = None;
289 }
290 assert!(!token.is_cancelled());
291 }
292
293 drop(token);
294 }
295 }
296
297 #[test]
cancel_only_all_descendants()298 fn cancel_only_all_descendants() {
299 // ARRANGE
300 let (waker, wake_counter) = new_count_waker();
301
302 let parent_token = CancellationToken::new();
303 let token = parent_token.child_token();
304 let sibling_token = parent_token.child_token();
305 let child1_token = token.child_token();
306 let child2_token = token.child_token();
307 let grandchild_token = child1_token.child_token();
308 let grandchild2_token = child1_token.child_token();
309 let great_grandchild_token = grandchild_token.child_token();
310
311 assert!(!parent_token.is_cancelled());
312 assert!(!token.is_cancelled());
313 assert!(!sibling_token.is_cancelled());
314 assert!(!child1_token.is_cancelled());
315 assert!(!child2_token.is_cancelled());
316 assert!(!grandchild_token.is_cancelled());
317 assert!(!grandchild2_token.is_cancelled());
318 assert!(!great_grandchild_token.is_cancelled());
319
320 let parent_fut = parent_token.cancelled();
321 let fut = token.cancelled();
322 let sibling_fut = sibling_token.cancelled();
323 let child1_fut = child1_token.cancelled();
324 let child2_fut = child2_token.cancelled();
325 let grandchild_fut = grandchild_token.cancelled();
326 let grandchild2_fut = grandchild2_token.cancelled();
327 let great_grandchild_fut = great_grandchild_token.cancelled();
328
329 pin!(parent_fut);
330 pin!(fut);
331 pin!(sibling_fut);
332 pin!(child1_fut);
333 pin!(child2_fut);
334 pin!(grandchild_fut);
335 pin!(grandchild2_fut);
336 pin!(great_grandchild_fut);
337
338 assert_eq!(
339 Poll::Pending,
340 parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
341 );
342 assert_eq!(
343 Poll::Pending,
344 fut.as_mut().poll(&mut Context::from_waker(&waker))
345 );
346 assert_eq!(
347 Poll::Pending,
348 sibling_fut.as_mut().poll(&mut Context::from_waker(&waker))
349 );
350 assert_eq!(
351 Poll::Pending,
352 child1_fut.as_mut().poll(&mut Context::from_waker(&waker))
353 );
354 assert_eq!(
355 Poll::Pending,
356 child2_fut.as_mut().poll(&mut Context::from_waker(&waker))
357 );
358 assert_eq!(
359 Poll::Pending,
360 grandchild_fut
361 .as_mut()
362 .poll(&mut Context::from_waker(&waker))
363 );
364 assert_eq!(
365 Poll::Pending,
366 grandchild2_fut
367 .as_mut()
368 .poll(&mut Context::from_waker(&waker))
369 );
370 assert_eq!(
371 Poll::Pending,
372 great_grandchild_fut
373 .as_mut()
374 .poll(&mut Context::from_waker(&waker))
375 );
376 assert_eq!(wake_counter, 0);
377
378 // ACT
379 token.cancel();
380
381 // ASSERT
382 assert_eq!(wake_counter, 6);
383 assert!(!parent_token.is_cancelled());
384 assert!(token.is_cancelled());
385 assert!(!sibling_token.is_cancelled());
386 assert!(child1_token.is_cancelled());
387 assert!(child2_token.is_cancelled());
388 assert!(grandchild_token.is_cancelled());
389 assert!(grandchild2_token.is_cancelled());
390 assert!(great_grandchild_token.is_cancelled());
391
392 assert_eq!(
393 Poll::Ready(()),
394 fut.as_mut().poll(&mut Context::from_waker(&waker))
395 );
396 assert_eq!(
397 Poll::Ready(()),
398 child1_fut.as_mut().poll(&mut Context::from_waker(&waker))
399 );
400 assert_eq!(
401 Poll::Ready(()),
402 child2_fut.as_mut().poll(&mut Context::from_waker(&waker))
403 );
404 assert_eq!(
405 Poll::Ready(()),
406 grandchild_fut
407 .as_mut()
408 .poll(&mut Context::from_waker(&waker))
409 );
410 assert_eq!(
411 Poll::Ready(()),
412 grandchild2_fut
413 .as_mut()
414 .poll(&mut Context::from_waker(&waker))
415 );
416 assert_eq!(
417 Poll::Ready(()),
418 great_grandchild_fut
419 .as_mut()
420 .poll(&mut Context::from_waker(&waker))
421 );
422 assert_eq!(wake_counter, 6);
423 }
424
425 #[test]
drop_parent_before_child_tokens()426 fn drop_parent_before_child_tokens() {
427 let token = CancellationToken::new();
428 let child1 = token.child_token();
429 let child2 = token.child_token();
430
431 drop(token);
432 assert!(!child1.is_cancelled());
433
434 drop(child1);
435 drop(child2);
436 }
437
438 #[test]
derives_send_sync()439 fn derives_send_sync() {
440 fn assert_send<T: Send>() {}
441 fn assert_sync<T: Sync>() {}
442
443 assert_send::<CancellationToken>();
444 assert_sync::<CancellationToken>();
445
446 assert_send::<WaitForCancellationFuture<'static>>();
447 assert_sync::<WaitForCancellationFuture<'static>>();
448 }
449
450 #[test]
run_until_cancelled_test()451 fn run_until_cancelled_test() {
452 let (waker, _) = new_count_waker();
453
454 {
455 let token = CancellationToken::new();
456
457 let fut = token.run_until_cancelled(std::future::pending::<()>());
458 pin!(fut);
459
460 assert_eq!(
461 Poll::Pending,
462 fut.as_mut().poll(&mut Context::from_waker(&waker))
463 );
464
465 token.cancel();
466
467 assert_eq!(
468 Poll::Ready(None),
469 fut.as_mut().poll(&mut Context::from_waker(&waker))
470 );
471 }
472
473 {
474 let (tx, rx) = oneshot::channel::<()>();
475
476 let token = CancellationToken::new();
477 let fut = token.run_until_cancelled(async move {
478 rx.await.unwrap();
479 42
480 });
481 pin!(fut);
482
483 assert_eq!(
484 Poll::Pending,
485 fut.as_mut().poll(&mut Context::from_waker(&waker))
486 );
487
488 tx.send(()).unwrap();
489
490 assert_eq!(
491 Poll::Ready(Some(42)),
492 fut.as_mut().poll(&mut Context::from_waker(&waker))
493 );
494 }
495 }
496