1 use std::fmt;
2 use std::future::Future;
3 use std::panic;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6
7 use crate::runtime::task::AbortHandle;
8 use crate::runtime::Builder;
9 use crate::sync::oneshot;
10 use crate::task::JoinHandle;
11
12 use futures::future::FutureExt;
13
14 // Enums for each option in the combinations being tested
15
16 #[derive(Copy, Clone, Debug, PartialEq)]
17 enum CombiRuntime {
18 CurrentThread,
19 Multi1,
20 Multi2,
21 }
22 #[derive(Copy, Clone, Debug, PartialEq)]
23 enum CombiLocalSet {
24 Yes,
25 No,
26 }
27 #[derive(Copy, Clone, Debug, PartialEq)]
28 enum CombiTask {
29 PanicOnRun,
30 PanicOnDrop,
31 PanicOnRunAndDrop,
32 NoPanic,
33 }
34 #[derive(Copy, Clone, Debug, PartialEq)]
35 enum CombiOutput {
36 PanicOnDrop,
37 NoPanic,
38 }
39 #[derive(Copy, Clone, Debug, PartialEq)]
40 enum CombiJoinInterest {
41 Polled,
42 NotPolled,
43 }
44 #[allow(clippy::enum_variant_names)] // we aren't using glob imports
45 #[derive(Copy, Clone, Debug, PartialEq)]
46 enum CombiJoinHandle {
47 DropImmediately = 1,
48 DropFirstPoll = 2,
49 DropAfterNoConsume = 3,
50 DropAfterConsume = 4,
51 }
52 #[derive(Copy, Clone, Debug, PartialEq)]
53 enum CombiAbort {
54 NotAborted = 0,
55 AbortedImmediately = 1,
56 AbortedFirstPoll = 2,
57 AbortedAfterFinish = 3,
58 AbortedAfterConsumeOutput = 4,
59 }
60
61 #[derive(Copy, Clone, Debug, PartialEq)]
62 enum CombiAbortSource {
63 JoinHandle,
64 AbortHandle,
65 }
66
67 #[test]
68 #[cfg_attr(panic = "abort", ignore)]
test_combinations()69 fn test_combinations() {
70 let mut rt = &[
71 CombiRuntime::CurrentThread,
72 CombiRuntime::Multi1,
73 CombiRuntime::Multi2,
74 ][..];
75
76 if cfg!(miri) {
77 rt = &[CombiRuntime::CurrentThread];
78 }
79
80 let ls = [CombiLocalSet::Yes, CombiLocalSet::No];
81 let task = [
82 CombiTask::NoPanic,
83 CombiTask::PanicOnRun,
84 CombiTask::PanicOnDrop,
85 CombiTask::PanicOnRunAndDrop,
86 ];
87 let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop];
88 let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled];
89 let jh = [
90 CombiJoinHandle::DropImmediately,
91 CombiJoinHandle::DropFirstPoll,
92 CombiJoinHandle::DropAfterNoConsume,
93 CombiJoinHandle::DropAfterConsume,
94 ];
95 let abort = [
96 CombiAbort::NotAborted,
97 CombiAbort::AbortedImmediately,
98 CombiAbort::AbortedFirstPoll,
99 CombiAbort::AbortedAfterFinish,
100 CombiAbort::AbortedAfterConsumeOutput,
101 ];
102 let ah = [
103 None,
104 Some(CombiJoinHandle::DropImmediately),
105 Some(CombiJoinHandle::DropFirstPoll),
106 Some(CombiJoinHandle::DropAfterNoConsume),
107 Some(CombiJoinHandle::DropAfterConsume),
108 ];
109
110 for rt in rt.iter().copied() {
111 for ls in ls.iter().copied() {
112 for task in task.iter().copied() {
113 for output in output.iter().copied() {
114 for ji in ji.iter().copied() {
115 for jh in jh.iter().copied() {
116 for abort in abort.iter().copied() {
117 // abort via join handle --- abort handles
118 // may be dropped at any point
119 for ah in ah.iter().copied() {
120 test_combination(
121 rt,
122 ls,
123 task,
124 output,
125 ji,
126 jh,
127 ah,
128 abort,
129 CombiAbortSource::JoinHandle,
130 );
131 }
132 // if aborting via AbortHandle, it will
133 // never be dropped.
134 test_combination(
135 rt,
136 ls,
137 task,
138 output,
139 ji,
140 jh,
141 None,
142 abort,
143 CombiAbortSource::AbortHandle,
144 );
145 }
146 }
147 }
148 }
149 }
150 }
151 }
152 }
153
is_debug<T: fmt::Debug>(_: &T)154 fn is_debug<T: fmt::Debug>(_: &T) {}
155
156 #[allow(clippy::too_many_arguments)]
test_combination( rt: CombiRuntime, ls: CombiLocalSet, task: CombiTask, output: CombiOutput, ji: CombiJoinInterest, jh: CombiJoinHandle, ah: Option<CombiJoinHandle>, abort: CombiAbort, abort_src: CombiAbortSource, )157 fn test_combination(
158 rt: CombiRuntime,
159 ls: CombiLocalSet,
160 task: CombiTask,
161 output: CombiOutput,
162 ji: CombiJoinInterest,
163 jh: CombiJoinHandle,
164 ah: Option<CombiJoinHandle>,
165 abort: CombiAbort,
166 abort_src: CombiAbortSource,
167 ) {
168 match (abort_src, ah) {
169 (CombiAbortSource::JoinHandle, _) if (jh as usize) < (abort as usize) => {
170 // join handle dropped prior to abort
171 return;
172 }
173 (CombiAbortSource::AbortHandle, Some(_)) => {
174 // abort handle dropped, we can't abort through the
175 // abort handle
176 return;
177 }
178
179 _ => {}
180 }
181
182 if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) {
183 // this causes double panic
184 return;
185 }
186 if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) {
187 // this causes double panic
188 return;
189 }
190
191 is_debug(&rt);
192 is_debug(&ls);
193 is_debug(&task);
194 is_debug(&output);
195 is_debug(&ji);
196 is_debug(&jh);
197 is_debug(&ah);
198 is_debug(&abort);
199 is_debug(&abort_src);
200
201 // A runtime optionally with a LocalSet
202 struct Rt {
203 rt: crate::runtime::Runtime,
204 ls: Option<crate::task::LocalSet>,
205 }
206 impl Rt {
207 fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self {
208 let rt = match rt {
209 CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(),
210 CombiRuntime::Multi1 => Builder::new_multi_thread()
211 .worker_threads(1)
212 .build()
213 .unwrap(),
214 CombiRuntime::Multi2 => Builder::new_multi_thread()
215 .worker_threads(2)
216 .build()
217 .unwrap(),
218 };
219
220 let ls = match ls {
221 CombiLocalSet::Yes => Some(crate::task::LocalSet::new()),
222 CombiLocalSet::No => None,
223 };
224
225 Self { rt, ls }
226 }
227 fn block_on<T>(&self, task: T) -> T::Output
228 where
229 T: Future,
230 {
231 match &self.ls {
232 Some(ls) => ls.block_on(&self.rt, task),
233 None => self.rt.block_on(task),
234 }
235 }
236 fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
237 where
238 T: Future + Send + 'static,
239 T::Output: Send + 'static,
240 {
241 match &self.ls {
242 Some(ls) => ls.spawn_local(task),
243 None => self.rt.spawn(task),
244 }
245 }
246 }
247
248 // The type used for the output of the future
249 struct Output {
250 panic_on_drop: bool,
251 on_drop: Option<oneshot::Sender<()>>,
252 }
253 impl Output {
254 fn disarm(&mut self) {
255 self.panic_on_drop = false;
256 }
257 }
258 impl Drop for Output {
259 fn drop(&mut self) {
260 let _ = self.on_drop.take().unwrap().send(());
261 if self.panic_on_drop {
262 panic!("Panicking in Output");
263 }
264 }
265 }
266
267 // A wrapper around the future that is spawned
268 struct FutWrapper<F> {
269 inner: F,
270 on_drop: Option<oneshot::Sender<()>>,
271 panic_on_drop: bool,
272 }
273 impl<F: Future> Future for FutWrapper<F> {
274 type Output = F::Output;
275 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
276 unsafe {
277 let me = Pin::into_inner_unchecked(self);
278 let inner = Pin::new_unchecked(&mut me.inner);
279 inner.poll(cx)
280 }
281 }
282 }
283 impl<F> Drop for FutWrapper<F> {
284 fn drop(&mut self) {
285 let _: Result<(), ()> = self.on_drop.take().unwrap().send(());
286 if self.panic_on_drop {
287 panic!("Panicking in FutWrapper");
288 }
289 }
290 }
291
292 // The channels passed to the task
293 struct Signals {
294 on_first_poll: Option<oneshot::Sender<()>>,
295 wait_complete: Option<oneshot::Receiver<()>>,
296 on_output_drop: Option<oneshot::Sender<()>>,
297 }
298
299 // The task we will spawn
300 async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output {
301 // Signal that we have been polled once
302 let _ = signal.on_first_poll.take().unwrap().send(());
303
304 // Wait for a signal, then complete the future
305 let _ = signal.wait_complete.take().unwrap().await;
306
307 // If the task gets past wait_complete without yielding, then aborts
308 // may not be caught without this yield_now.
309 crate::task::yield_now().await;
310
311 if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop {
312 panic!("Panicking in my_task on {:?}", std::thread::current().id());
313 }
314
315 Output {
316 panic_on_drop: out == CombiOutput::PanicOnDrop,
317 on_drop: signal.on_output_drop.take(),
318 }
319 }
320
321 let rt = Rt::new(rt, ls);
322
323 let (on_first_poll, wait_first_poll) = oneshot::channel();
324 let (on_complete, wait_complete) = oneshot::channel();
325 let (on_future_drop, wait_future_drop) = oneshot::channel();
326 let (on_output_drop, wait_output_drop) = oneshot::channel();
327 let signal = Signals {
328 on_first_poll: Some(on_first_poll),
329 wait_complete: Some(wait_complete),
330 on_output_drop: Some(on_output_drop),
331 };
332
333 // === Spawn task ===
334 let mut handle = Some(rt.spawn(FutWrapper {
335 inner: my_task(signal, task, output),
336 on_drop: Some(on_future_drop),
337 panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop,
338 }));
339
340 // Keep track of whether the task has been killed with an abort
341 let mut aborted = false;
342
343 // If we want to poll the JoinHandle, do it now
344 if ji == CombiJoinInterest::Polled {
345 assert!(
346 handle.as_mut().unwrap().now_or_never().is_none(),
347 "Polling handle succeeded"
348 );
349 }
350
351 // If we are either aborting the task via an abort handle, or dropping via
352 // an abort handle, do that now.
353 let mut abort_handle = if ah.is_some() || abort_src == CombiAbortSource::AbortHandle {
354 handle.as_ref().map(JoinHandle::abort_handle)
355 } else {
356 None
357 };
358
359 let do_abort = |abort_handle: &mut Option<AbortHandle>,
360 join_handle: Option<&mut JoinHandle<_>>| {
361 match abort_src {
362 CombiAbortSource::AbortHandle => abort_handle.take().unwrap().abort(),
363 CombiAbortSource::JoinHandle => join_handle.unwrap().abort(),
364 }
365 };
366
367 if abort == CombiAbort::AbortedImmediately {
368 do_abort(&mut abort_handle, handle.as_mut());
369 aborted = true;
370 }
371 if jh == CombiJoinHandle::DropImmediately {
372 drop(handle.take().unwrap());
373 }
374
375 // === Wait for first poll ===
376 let got_polled = rt.block_on(wait_first_poll).is_ok();
377 if !got_polled {
378 // it's possible that we are aborted but still got polled
379 assert!(
380 aborted,
381 "Task completed without ever being polled but was not aborted."
382 );
383 }
384
385 if abort == CombiAbort::AbortedFirstPoll {
386 do_abort(&mut abort_handle, handle.as_mut());
387 aborted = true;
388 }
389 if jh == CombiJoinHandle::DropFirstPoll {
390 drop(handle.take().unwrap());
391 }
392 if ah == Some(CombiJoinHandle::DropFirstPoll) {
393 drop(abort_handle.take().unwrap());
394 }
395
396 // Signal the future that it can return now
397 let _ = on_complete.send(());
398 // === Wait for future to be dropped ===
399 assert!(
400 rt.block_on(wait_future_drop).is_ok(),
401 "The future should always be dropped."
402 );
403
404 if abort == CombiAbort::AbortedAfterFinish {
405 // Don't set aborted to true here as the task already finished
406 do_abort(&mut abort_handle, handle.as_mut());
407 }
408 if jh == CombiJoinHandle::DropAfterNoConsume {
409 if ah == Some(CombiJoinHandle::DropAfterNoConsume) {
410 drop(handle.take().unwrap());
411 // The runtime will usually have dropped every ref-count at this point,
412 // in which case dropping the AbortHandle drops the output.
413 //
414 // (But it might race and still hold a ref-count)
415 let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
416 drop(abort_handle.take().unwrap());
417 }));
418 if panic.is_err() {
419 assert!(
420 (output == CombiOutput::PanicOnDrop)
421 && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
422 && !aborted,
423 "Dropping AbortHandle shouldn't panic here"
424 );
425 }
426 } else {
427 // The runtime will usually have dropped every ref-count at this point,
428 // in which case dropping the JoinHandle drops the output.
429 //
430 // (But it might race and still hold a ref-count)
431 let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
432 drop(handle.take().unwrap());
433 }));
434 if panic.is_err() {
435 assert!(
436 (output == CombiOutput::PanicOnDrop)
437 && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
438 && !aborted,
439 "Dropping JoinHandle shouldn't panic here"
440 );
441 }
442 }
443 }
444
445 // Check whether we drop after consuming the output
446 if jh == CombiJoinHandle::DropAfterConsume {
447 // Using as_mut() to not immediately drop the handle
448 let result = rt.block_on(handle.as_mut().unwrap());
449
450 match result {
451 Ok(mut output) => {
452 // Don't panic here.
453 output.disarm();
454 assert!(!aborted, "Task was aborted but returned output");
455 }
456 Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"),
457 Err(err) if err.is_panic() => {
458 assert!(
459 (task == CombiTask::PanicOnRun)
460 || (task == CombiTask::PanicOnDrop)
461 || (task == CombiTask::PanicOnRunAndDrop)
462 || (output == CombiOutput::PanicOnDrop),
463 "Panic but nothing should panic"
464 );
465 }
466 _ => unreachable!(),
467 }
468
469 let mut handle = handle.take().unwrap();
470 if abort == CombiAbort::AbortedAfterConsumeOutput {
471 do_abort(&mut abort_handle, Some(&mut handle));
472 }
473 drop(handle);
474
475 if ah == Some(CombiJoinHandle::DropAfterConsume) {
476 drop(abort_handle.take());
477 }
478 }
479
480 // The output should have been dropped now. Check whether the output
481 // object was created at all.
482 let output_created = rt.block_on(wait_output_drop).is_ok();
483 assert_eq!(
484 output_created,
485 (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted,
486 "Creation of output object"
487 );
488 }
489