1 use std::fmt;
2 use std::future::Future;
3 use std::panic;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 
7 use crate::runtime::task::AbortHandle;
8 use crate::runtime::Builder;
9 use crate::sync::oneshot;
10 use crate::task::JoinHandle;
11 
12 use futures::future::FutureExt;
13 
14 // Enums for each option in the combinations being tested
15 
16 #[derive(Copy, Clone, Debug, PartialEq)]
17 enum CombiRuntime {
18     CurrentThread,
19     Multi1,
20     Multi2,
21 }
22 #[derive(Copy, Clone, Debug, PartialEq)]
23 enum CombiLocalSet {
24     Yes,
25     No,
26 }
27 #[derive(Copy, Clone, Debug, PartialEq)]
28 enum CombiTask {
29     PanicOnRun,
30     PanicOnDrop,
31     PanicOnRunAndDrop,
32     NoPanic,
33 }
34 #[derive(Copy, Clone, Debug, PartialEq)]
35 enum CombiOutput {
36     PanicOnDrop,
37     NoPanic,
38 }
39 #[derive(Copy, Clone, Debug, PartialEq)]
40 enum CombiJoinInterest {
41     Polled,
42     NotPolled,
43 }
44 #[allow(clippy::enum_variant_names)] // we aren't using glob imports
45 #[derive(Copy, Clone, Debug, PartialEq)]
46 enum CombiJoinHandle {
47     DropImmediately = 1,
48     DropFirstPoll = 2,
49     DropAfterNoConsume = 3,
50     DropAfterConsume = 4,
51 }
52 #[derive(Copy, Clone, Debug, PartialEq)]
53 enum CombiAbort {
54     NotAborted = 0,
55     AbortedImmediately = 1,
56     AbortedFirstPoll = 2,
57     AbortedAfterFinish = 3,
58     AbortedAfterConsumeOutput = 4,
59 }
60 
61 #[derive(Copy, Clone, Debug, PartialEq)]
62 enum CombiAbortSource {
63     JoinHandle,
64     AbortHandle,
65 }
66 
67 #[test]
68 #[cfg_attr(panic = "abort", ignore)]
test_combinations()69 fn test_combinations() {
70     let mut rt = &[
71         CombiRuntime::CurrentThread,
72         CombiRuntime::Multi1,
73         CombiRuntime::Multi2,
74     ][..];
75 
76     if cfg!(miri) {
77         rt = &[CombiRuntime::CurrentThread];
78     }
79 
80     let ls = [CombiLocalSet::Yes, CombiLocalSet::No];
81     let task = [
82         CombiTask::NoPanic,
83         CombiTask::PanicOnRun,
84         CombiTask::PanicOnDrop,
85         CombiTask::PanicOnRunAndDrop,
86     ];
87     let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop];
88     let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled];
89     let jh = [
90         CombiJoinHandle::DropImmediately,
91         CombiJoinHandle::DropFirstPoll,
92         CombiJoinHandle::DropAfterNoConsume,
93         CombiJoinHandle::DropAfterConsume,
94     ];
95     let abort = [
96         CombiAbort::NotAborted,
97         CombiAbort::AbortedImmediately,
98         CombiAbort::AbortedFirstPoll,
99         CombiAbort::AbortedAfterFinish,
100         CombiAbort::AbortedAfterConsumeOutput,
101     ];
102     let ah = [
103         None,
104         Some(CombiJoinHandle::DropImmediately),
105         Some(CombiJoinHandle::DropFirstPoll),
106         Some(CombiJoinHandle::DropAfterNoConsume),
107         Some(CombiJoinHandle::DropAfterConsume),
108     ];
109 
110     for rt in rt.iter().copied() {
111         for ls in ls.iter().copied() {
112             for task in task.iter().copied() {
113                 for output in output.iter().copied() {
114                     for ji in ji.iter().copied() {
115                         for jh in jh.iter().copied() {
116                             for abort in abort.iter().copied() {
117                                 // abort via join handle --- abort  handles
118                                 // may be dropped at any point
119                                 for ah in ah.iter().copied() {
120                                     test_combination(
121                                         rt,
122                                         ls,
123                                         task,
124                                         output,
125                                         ji,
126                                         jh,
127                                         ah,
128                                         abort,
129                                         CombiAbortSource::JoinHandle,
130                                     );
131                                 }
132                                 // if aborting via AbortHandle, it will
133                                 // never be dropped.
134                                 test_combination(
135                                     rt,
136                                     ls,
137                                     task,
138                                     output,
139                                     ji,
140                                     jh,
141                                     None,
142                                     abort,
143                                     CombiAbortSource::AbortHandle,
144                                 );
145                             }
146                         }
147                     }
148                 }
149             }
150         }
151     }
152 }
153 
is_debug<T: fmt::Debug>(_: &T)154 fn is_debug<T: fmt::Debug>(_: &T) {}
155 
156 #[allow(clippy::too_many_arguments)]
test_combination( rt: CombiRuntime, ls: CombiLocalSet, task: CombiTask, output: CombiOutput, ji: CombiJoinInterest, jh: CombiJoinHandle, ah: Option<CombiJoinHandle>, abort: CombiAbort, abort_src: CombiAbortSource, )157 fn test_combination(
158     rt: CombiRuntime,
159     ls: CombiLocalSet,
160     task: CombiTask,
161     output: CombiOutput,
162     ji: CombiJoinInterest,
163     jh: CombiJoinHandle,
164     ah: Option<CombiJoinHandle>,
165     abort: CombiAbort,
166     abort_src: CombiAbortSource,
167 ) {
168     match (abort_src, ah) {
169         (CombiAbortSource::JoinHandle, _) if (jh as usize) < (abort as usize) => {
170             // join handle dropped prior to abort
171             return;
172         }
173         (CombiAbortSource::AbortHandle, Some(_)) => {
174             // abort handle dropped, we can't abort through the
175             // abort handle
176             return;
177         }
178 
179         _ => {}
180     }
181 
182     if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) {
183         // this causes double panic
184         return;
185     }
186     if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) {
187         // this causes double panic
188         return;
189     }
190 
191     is_debug(&rt);
192     is_debug(&ls);
193     is_debug(&task);
194     is_debug(&output);
195     is_debug(&ji);
196     is_debug(&jh);
197     is_debug(&ah);
198     is_debug(&abort);
199     is_debug(&abort_src);
200 
201     // A runtime optionally with a LocalSet
202     struct Rt {
203         rt: crate::runtime::Runtime,
204         ls: Option<crate::task::LocalSet>,
205     }
206     impl Rt {
207         fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self {
208             let rt = match rt {
209                 CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(),
210                 CombiRuntime::Multi1 => Builder::new_multi_thread()
211                     .worker_threads(1)
212                     .build()
213                     .unwrap(),
214                 CombiRuntime::Multi2 => Builder::new_multi_thread()
215                     .worker_threads(2)
216                     .build()
217                     .unwrap(),
218             };
219 
220             let ls = match ls {
221                 CombiLocalSet::Yes => Some(crate::task::LocalSet::new()),
222                 CombiLocalSet::No => None,
223             };
224 
225             Self { rt, ls }
226         }
227         fn block_on<T>(&self, task: T) -> T::Output
228         where
229             T: Future,
230         {
231             match &self.ls {
232                 Some(ls) => ls.block_on(&self.rt, task),
233                 None => self.rt.block_on(task),
234             }
235         }
236         fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
237         where
238             T: Future + Send + 'static,
239             T::Output: Send + 'static,
240         {
241             match &self.ls {
242                 Some(ls) => ls.spawn_local(task),
243                 None => self.rt.spawn(task),
244             }
245         }
246     }
247 
248     // The type used for the output of the future
249     struct Output {
250         panic_on_drop: bool,
251         on_drop: Option<oneshot::Sender<()>>,
252     }
253     impl Output {
254         fn disarm(&mut self) {
255             self.panic_on_drop = false;
256         }
257     }
258     impl Drop for Output {
259         fn drop(&mut self) {
260             let _ = self.on_drop.take().unwrap().send(());
261             if self.panic_on_drop {
262                 panic!("Panicking in Output");
263             }
264         }
265     }
266 
267     // A wrapper around the future that is spawned
268     struct FutWrapper<F> {
269         inner: F,
270         on_drop: Option<oneshot::Sender<()>>,
271         panic_on_drop: bool,
272     }
273     impl<F: Future> Future for FutWrapper<F> {
274         type Output = F::Output;
275         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
276             unsafe {
277                 let me = Pin::into_inner_unchecked(self);
278                 let inner = Pin::new_unchecked(&mut me.inner);
279                 inner.poll(cx)
280             }
281         }
282     }
283     impl<F> Drop for FutWrapper<F> {
284         fn drop(&mut self) {
285             let _: Result<(), ()> = self.on_drop.take().unwrap().send(());
286             if self.panic_on_drop {
287                 panic!("Panicking in FutWrapper");
288             }
289         }
290     }
291 
292     // The channels passed to the task
293     struct Signals {
294         on_first_poll: Option<oneshot::Sender<()>>,
295         wait_complete: Option<oneshot::Receiver<()>>,
296         on_output_drop: Option<oneshot::Sender<()>>,
297     }
298 
299     // The task we will spawn
300     async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output {
301         // Signal that we have been polled once
302         let _ = signal.on_first_poll.take().unwrap().send(());
303 
304         // Wait for a signal, then complete the future
305         let _ = signal.wait_complete.take().unwrap().await;
306 
307         // If the task gets past wait_complete without yielding, then aborts
308         // may not be caught without this yield_now.
309         crate::task::yield_now().await;
310 
311         if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop {
312             panic!("Panicking in my_task on {:?}", std::thread::current().id());
313         }
314 
315         Output {
316             panic_on_drop: out == CombiOutput::PanicOnDrop,
317             on_drop: signal.on_output_drop.take(),
318         }
319     }
320 
321     let rt = Rt::new(rt, ls);
322 
323     let (on_first_poll, wait_first_poll) = oneshot::channel();
324     let (on_complete, wait_complete) = oneshot::channel();
325     let (on_future_drop, wait_future_drop) = oneshot::channel();
326     let (on_output_drop, wait_output_drop) = oneshot::channel();
327     let signal = Signals {
328         on_first_poll: Some(on_first_poll),
329         wait_complete: Some(wait_complete),
330         on_output_drop: Some(on_output_drop),
331     };
332 
333     // === Spawn task ===
334     let mut handle = Some(rt.spawn(FutWrapper {
335         inner: my_task(signal, task, output),
336         on_drop: Some(on_future_drop),
337         panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop,
338     }));
339 
340     // Keep track of whether the task has been killed with an abort
341     let mut aborted = false;
342 
343     // If we want to poll the JoinHandle, do it now
344     if ji == CombiJoinInterest::Polled {
345         assert!(
346             handle.as_mut().unwrap().now_or_never().is_none(),
347             "Polling handle succeeded"
348         );
349     }
350 
351     // If we are either aborting the task via an abort handle, or dropping via
352     // an abort handle, do that now.
353     let mut abort_handle = if ah.is_some() || abort_src == CombiAbortSource::AbortHandle {
354         handle.as_ref().map(JoinHandle::abort_handle)
355     } else {
356         None
357     };
358 
359     let do_abort = |abort_handle: &mut Option<AbortHandle>,
360                     join_handle: Option<&mut JoinHandle<_>>| {
361         match abort_src {
362             CombiAbortSource::AbortHandle => abort_handle.take().unwrap().abort(),
363             CombiAbortSource::JoinHandle => join_handle.unwrap().abort(),
364         }
365     };
366 
367     if abort == CombiAbort::AbortedImmediately {
368         do_abort(&mut abort_handle, handle.as_mut());
369         aborted = true;
370     }
371     if jh == CombiJoinHandle::DropImmediately {
372         drop(handle.take().unwrap());
373     }
374 
375     // === Wait for first poll ===
376     let got_polled = rt.block_on(wait_first_poll).is_ok();
377     if !got_polled {
378         // it's possible that we are aborted but still got polled
379         assert!(
380             aborted,
381             "Task completed without ever being polled but was not aborted."
382         );
383     }
384 
385     if abort == CombiAbort::AbortedFirstPoll {
386         do_abort(&mut abort_handle, handle.as_mut());
387         aborted = true;
388     }
389     if jh == CombiJoinHandle::DropFirstPoll {
390         drop(handle.take().unwrap());
391     }
392     if ah == Some(CombiJoinHandle::DropFirstPoll) {
393         drop(abort_handle.take().unwrap());
394     }
395 
396     // Signal the future that it can return now
397     let _ = on_complete.send(());
398     // === Wait for future to be dropped ===
399     assert!(
400         rt.block_on(wait_future_drop).is_ok(),
401         "The future should always be dropped."
402     );
403 
404     if abort == CombiAbort::AbortedAfterFinish {
405         // Don't set aborted to true here as the task already finished
406         do_abort(&mut abort_handle, handle.as_mut());
407     }
408     if jh == CombiJoinHandle::DropAfterNoConsume {
409         if ah == Some(CombiJoinHandle::DropAfterNoConsume) {
410             drop(handle.take().unwrap());
411             // The runtime will usually have dropped every ref-count at this point,
412             // in which case dropping the AbortHandle drops the output.
413             //
414             // (But it might race and still hold a ref-count)
415             let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
416                 drop(abort_handle.take().unwrap());
417             }));
418             if panic.is_err() {
419                 assert!(
420                     (output == CombiOutput::PanicOnDrop)
421                         && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
422                         && !aborted,
423                     "Dropping AbortHandle shouldn't panic here"
424                 );
425             }
426         } else {
427             // The runtime will usually have dropped every ref-count at this point,
428             // in which case dropping the JoinHandle drops the output.
429             //
430             // (But it might race and still hold a ref-count)
431             let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
432                 drop(handle.take().unwrap());
433             }));
434             if panic.is_err() {
435                 assert!(
436                     (output == CombiOutput::PanicOnDrop)
437                         && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
438                         && !aborted,
439                     "Dropping JoinHandle shouldn't panic here"
440                 );
441             }
442         }
443     }
444 
445     // Check whether we drop after consuming the output
446     if jh == CombiJoinHandle::DropAfterConsume {
447         // Using as_mut() to not immediately drop the handle
448         let result = rt.block_on(handle.as_mut().unwrap());
449 
450         match result {
451             Ok(mut output) => {
452                 // Don't panic here.
453                 output.disarm();
454                 assert!(!aborted, "Task was aborted but returned output");
455             }
456             Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"),
457             Err(err) if err.is_panic() => {
458                 assert!(
459                     (task == CombiTask::PanicOnRun)
460                         || (task == CombiTask::PanicOnDrop)
461                         || (task == CombiTask::PanicOnRunAndDrop)
462                         || (output == CombiOutput::PanicOnDrop),
463                     "Panic but nothing should panic"
464                 );
465             }
466             _ => unreachable!(),
467         }
468 
469         let mut handle = handle.take().unwrap();
470         if abort == CombiAbort::AbortedAfterConsumeOutput {
471             do_abort(&mut abort_handle, Some(&mut handle));
472         }
473         drop(handle);
474 
475         if ah == Some(CombiJoinHandle::DropAfterConsume) {
476             drop(abort_handle.take());
477         }
478     }
479 
480     // The output should have been dropped now. Check whether the output
481     // object was created at all.
482     let output_created = rt.block_on(wait_output_drop).is_ok();
483     assert_eq!(
484         output_created,
485         (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted,
486         "Creation of output object"
487     );
488 }
489