1 use crate::future::Future;
2 use crate::runtime::task::core::{Cell, Core, Header, Trailer};
3 use crate::runtime::task::state::{Snapshot, State};
4 use crate::runtime::task::waker::waker_ref;
5 use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task};
6 
7 use crate::runtime::TaskMeta;
8 use std::any::Any;
9 use std::mem;
10 use std::mem::ManuallyDrop;
11 use std::panic;
12 use std::ptr::NonNull;
13 use std::task::{Context, Poll, Waker};
14 
15 /// Typed raw task handle.
16 pub(super) struct Harness<T: Future, S: 'static> {
17     cell: NonNull<Cell<T, S>>,
18 }
19 
20 impl<T, S> Harness<T, S>
21 where
22     T: Future,
23     S: 'static,
24 {
from_raw(ptr: NonNull<Header>) -> Harness<T, S>25     pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
26         Harness {
27             cell: ptr.cast::<Cell<T, S>>(),
28         }
29     }
30 
header_ptr(&self) -> NonNull<Header>31     fn header_ptr(&self) -> NonNull<Header> {
32         self.cell.cast()
33     }
34 
header(&self) -> &Header35     fn header(&self) -> &Header {
36         unsafe { &*self.header_ptr().as_ptr() }
37     }
38 
state(&self) -> &State39     fn state(&self) -> &State {
40         &self.header().state
41     }
42 
trailer(&self) -> &Trailer43     fn trailer(&self) -> &Trailer {
44         unsafe { &self.cell.as_ref().trailer }
45     }
46 
core(&self) -> &Core<T, S>47     fn core(&self) -> &Core<T, S> {
48         unsafe { &self.cell.as_ref().core }
49     }
50 }
51 
52 /// Task operations that can be implemented without being generic over the
53 /// scheduler or task. Only one version of these methods should exist in the
54 /// final binary.
55 impl RawTask {
drop_reference(self)56     pub(super) fn drop_reference(self) {
57         if self.state().ref_dec() {
58             self.dealloc();
59         }
60     }
61 
62     /// This call consumes a ref-count and notifies the task. This will create a
63     /// new Notified and submit it if necessary.
64     ///
65     /// The caller does not need to hold a ref-count besides the one that was
66     /// passed to this call.
wake_by_val(&self)67     pub(super) fn wake_by_val(&self) {
68         use super::state::TransitionToNotifiedByVal;
69 
70         match self.state().transition_to_notified_by_val() {
71             TransitionToNotifiedByVal::Submit => {
72                 // The caller has given us a ref-count, and the transition has
73                 // created a new ref-count, so we now hold two. We turn the new
74                 // ref-count Notified and pass it to the call to `schedule`.
75                 //
76                 // The old ref-count is retained for now to ensure that the task
77                 // is not dropped during the call to `schedule` if the call
78                 // drops the task it was given.
79                 self.schedule();
80 
81                 // Now that we have completed the call to schedule, we can
82                 // release our ref-count.
83                 self.drop_reference();
84             }
85             TransitionToNotifiedByVal::Dealloc => {
86                 self.dealloc();
87             }
88             TransitionToNotifiedByVal::DoNothing => {}
89         }
90     }
91 
92     /// This call notifies the task. It will not consume any ref-counts, but the
93     /// caller should hold a ref-count.  This will create a new Notified and
94     /// submit it if necessary.
wake_by_ref(&self)95     pub(super) fn wake_by_ref(&self) {
96         use super::state::TransitionToNotifiedByRef;
97 
98         match self.state().transition_to_notified_by_ref() {
99             TransitionToNotifiedByRef::Submit => {
100                 // The transition above incremented the ref-count for a new task
101                 // and the caller also holds a ref-count. The caller's ref-count
102                 // ensures that the task is not destroyed even if the new task
103                 // is dropped before `schedule` returns.
104                 self.schedule();
105             }
106             TransitionToNotifiedByRef::DoNothing => {}
107         }
108     }
109 
110     /// Remotely aborts the task.
111     ///
112     /// The caller should hold a ref-count, but we do not consume it.
113     ///
114     /// This is similar to `shutdown` except that it asks the runtime to perform
115     /// the shutdown. This is necessary to avoid the shutdown happening in the
116     /// wrong thread for non-Send tasks.
remote_abort(&self)117     pub(super) fn remote_abort(&self) {
118         if self.state().transition_to_notified_and_cancel() {
119             // The transition has created a new ref-count, which we turn into
120             // a Notified and pass to the task.
121             //
122             // Since the caller holds a ref-count, the task cannot be destroyed
123             // before the call to `schedule` returns even if the call drops the
124             // `Notified` internally.
125             self.schedule();
126         }
127     }
128 
129     /// Try to set the waker notified when the task is complete. Returns true if
130     /// the task has already completed. If this call returns false, then the
131     /// waker will not be notified.
try_set_join_waker(&self, waker: &Waker) -> bool132     pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool {
133         can_read_output(self.header(), self.trailer(), waker)
134     }
135 }
136 
137 impl<T, S> Harness<T, S>
138 where
139     T: Future,
140     S: Schedule,
141 {
drop_reference(self)142     pub(super) fn drop_reference(self) {
143         if self.state().ref_dec() {
144             self.dealloc();
145         }
146     }
147 
148     /// Polls the inner future. A ref-count is consumed.
149     ///
150     /// All necessary state checks and transitions are performed.
151     /// Panics raised while polling the future are handled.
poll(self)152     pub(super) fn poll(self) {
153         // We pass our ref-count to `poll_inner`.
154         match self.poll_inner() {
155             PollFuture::Notified => {
156                 // The `poll_inner` call has given us two ref-counts back.
157                 // We give one of them to a new task and call `yield_now`.
158                 self.core()
159                     .scheduler
160                     .yield_now(Notified(self.get_new_task()));
161 
162                 // The remaining ref-count is now dropped. We kept the extra
163                 // ref-count until now to ensure that even if the `yield_now`
164                 // call drops the provided task, the task isn't deallocated
165                 // before after `yield_now` returns.
166                 self.drop_reference();
167             }
168             PollFuture::Complete => {
169                 self.complete();
170             }
171             PollFuture::Dealloc => {
172                 self.dealloc();
173             }
174             PollFuture::Done => (),
175         }
176     }
177 
178     /// Polls the task and cancel it if necessary. This takes ownership of a
179     /// ref-count.
180     ///
181     /// If the return value is Notified, the caller is given ownership of two
182     /// ref-counts.
183     ///
184     /// If the return value is Complete, the caller is given ownership of a
185     /// single ref-count, which should be passed on to `complete`.
186     ///
187     /// If the return value is `Dealloc`, then this call consumed the last
188     /// ref-count and the caller should call `dealloc`.
189     ///
190     /// Otherwise the ref-count is consumed and the caller should not access
191     /// `self` again.
poll_inner(&self) -> PollFuture192     fn poll_inner(&self) -> PollFuture {
193         use super::state::{TransitionToIdle, TransitionToRunning};
194 
195         match self.state().transition_to_running() {
196             TransitionToRunning::Success => {
197                 // Separated to reduce LLVM codegen
198                 fn transition_result_to_poll_future(result: TransitionToIdle) -> PollFuture {
199                     match result {
200                         TransitionToIdle::Ok => PollFuture::Done,
201                         TransitionToIdle::OkNotified => PollFuture::Notified,
202                         TransitionToIdle::OkDealloc => PollFuture::Dealloc,
203                         TransitionToIdle::Cancelled => PollFuture::Complete,
204                     }
205                 }
206                 let header_ptr = self.header_ptr();
207                 let waker_ref = waker_ref::<S>(&header_ptr);
208                 let cx = Context::from_waker(&waker_ref);
209                 let res = poll_future(self.core(), cx);
210 
211                 if res == Poll::Ready(()) {
212                     // The future completed. Move on to complete the task.
213                     return PollFuture::Complete;
214                 }
215 
216                 let transition_res = self.state().transition_to_idle();
217                 if let TransitionToIdle::Cancelled = transition_res {
218                     // The transition to idle failed because the task was
219                     // cancelled during the poll.
220                     cancel_task(self.core());
221                 }
222                 transition_result_to_poll_future(transition_res)
223             }
224             TransitionToRunning::Cancelled => {
225                 cancel_task(self.core());
226                 PollFuture::Complete
227             }
228             TransitionToRunning::Failed => PollFuture::Done,
229             TransitionToRunning::Dealloc => PollFuture::Dealloc,
230         }
231     }
232 
233     /// Forcibly shuts down the task.
234     ///
235     /// Attempt to transition to `Running` in order to forcibly shutdown the
236     /// task. If the task is currently running or in a state of completion, then
237     /// there is nothing further to do. When the task completes running, it will
238     /// notice the `CANCELLED` bit and finalize the task.
shutdown(self)239     pub(super) fn shutdown(self) {
240         if !self.state().transition_to_shutdown() {
241             // The task is concurrently running. No further work needed.
242             self.drop_reference();
243             return;
244         }
245 
246         // By transitioning the lifecycle to `Running`, we have permission to
247         // drop the future.
248         cancel_task(self.core());
249         self.complete();
250     }
251 
dealloc(self)252     pub(super) fn dealloc(self) {
253         // Observe that we expect to have mutable access to these objects
254         // because we are going to drop them. This only matters when running
255         // under loom.
256         self.trailer().waker.with_mut(|_| ());
257         self.core().stage.with_mut(|_| ());
258 
259         // Safety: The caller of this method just transitioned our ref-count to
260         // zero, so it is our responsibility to release the allocation.
261         //
262         // We don't hold any references into the allocation at this point, but
263         // it is possible for another thread to still hold a `&State` into the
264         // allocation if that other thread has decremented its last ref-count,
265         // but has not yet returned from the relevant method on `State`.
266         //
267         // However, the `State` type consists of just an `AtomicUsize`, and an
268         // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`.
269         // As explained in the documentation for `UnsafeCell`, such references
270         // are allowed to be dangling after their last use, even if the
271         // reference has not yet gone out of scope.
272         unsafe {
273             drop(Box::from_raw(self.cell.as_ptr()));
274         }
275     }
276 
277     // ===== join handle =====
278 
279     /// Read the task output into `dst`.
try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker)280     pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
281         if can_read_output(self.header(), self.trailer(), waker) {
282             *dst = Poll::Ready(self.core().take_output());
283         }
284     }
285 
drop_join_handle_slow(self)286     pub(super) fn drop_join_handle_slow(self) {
287         // Try to unset `JOIN_INTEREST`. This must be done as a first step in
288         // case the task concurrently completed.
289         if self.state().unset_join_interested().is_err() {
290             // It is our responsibility to drop the output. This is critical as
291             // the task output may not be `Send` and as such must remain with
292             // the scheduler or `JoinHandle`. i.e. if the output remains in the
293             // task structure until the task is deallocated, it may be dropped
294             // by a Waker on any arbitrary thread.
295             //
296             // Panics are delivered to the user via the `JoinHandle`. Given that
297             // they are dropping the `JoinHandle`, we assume they are not
298             // interested in the panic and swallow it.
299             let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
300                 self.core().drop_future_or_output();
301             }));
302         }
303 
304         // Drop the `JoinHandle` reference, possibly deallocating the task
305         self.drop_reference();
306     }
307 
308     // ====== internal ======
309 
310     /// Completes the task. This method assumes that the state is RUNNING.
complete(self)311     fn complete(self) {
312         // The future has completed and its output has been written to the task
313         // stage. We transition from running to complete.
314 
315         let snapshot = self.state().transition_to_complete();
316 
317         // We catch panics here in case dropping the future or waking the
318         // JoinHandle panics.
319         let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
320             if !snapshot.is_join_interested() {
321                 // The `JoinHandle` is not interested in the output of
322                 // this task. It is our responsibility to drop the
323                 // output.
324                 self.core().drop_future_or_output();
325             } else if snapshot.is_join_waker_set() {
326                 // Notify the waker. Reading the waker field is safe per rule 4
327                 // in task/mod.rs, since the JOIN_WAKER bit is set and the call
328                 // to transition_to_complete() above set the COMPLETE bit.
329                 self.trailer().wake_join();
330             }
331         }));
332 
333         // We catch panics here in case invoking a hook panics.
334         //
335         // We call this in a separate block so that it runs after the task appears to have
336         // completed and will still run if the destructor panics.
337         if let Some(f) = self.trailer().hooks.task_terminate_callback.as_ref() {
338             let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
339                 f(&TaskMeta {
340                     id: self.core().task_id,
341                     _phantom: Default::default(),
342                 })
343             }));
344         }
345 
346         // The task has completed execution and will no longer be scheduled.
347         let num_release = self.release();
348 
349         if self.state().transition_to_terminal(num_release) {
350             self.dealloc();
351         }
352     }
353 
354     /// Releases the task from the scheduler. Returns the number of ref-counts
355     /// that should be decremented.
release(&self) -> usize356     fn release(&self) -> usize {
357         // We don't actually increment the ref-count here, but the new task is
358         // never destroyed, so that's ok.
359         let me = ManuallyDrop::new(self.get_new_task());
360 
361         if let Some(task) = self.core().scheduler.release(&me) {
362             mem::forget(task);
363             2
364         } else {
365             1
366         }
367     }
368 
369     /// Creates a new task that holds its own ref-count.
370     ///
371     /// # Safety
372     ///
373     /// Any use of `self` after this call must ensure that a ref-count to the
374     /// task holds the task alive until after the use of `self`. Passing the
375     /// returned Task to any method on `self` is unsound if dropping the Task
376     /// could drop `self` before the call on `self` returned.
get_new_task(&self) -> Task<S>377     fn get_new_task(&self) -> Task<S> {
378         // safety: The header is at the beginning of the cell, so this cast is
379         // safe.
380         unsafe { Task::from_raw(self.cell.cast()) }
381     }
382 }
383 
can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool384 fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
385     // Load a snapshot of the current task state
386     let snapshot = header.state.load();
387 
388     debug_assert!(snapshot.is_join_interested());
389 
390     if !snapshot.is_complete() {
391         // If the task is not complete, try storing the provided waker in the
392         // task's waker field.
393 
394         let res = if snapshot.is_join_waker_set() {
395             // If JOIN_WAKER is set, then JoinHandle has previously stored a
396             // waker in the waker field per step (iii) of rule 5 in task/mod.rs.
397 
398             // Optimization: if the stored waker and the provided waker wake the
399             // same task, then return without touching the waker field. (Reading
400             // the waker field below is safe per rule 3 in task/mod.rs.)
401             if unsafe { trailer.will_wake(waker) } {
402                 return false;
403             }
404 
405             // Otherwise swap the stored waker with the provided waker by
406             // following the rule 5 in task/mod.rs.
407             header
408                 .state
409                 .unset_waker()
410                 .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
411         } else {
412             // If JOIN_WAKER is unset, then JoinHandle has mutable access to the
413             // waker field per rule 2 in task/mod.rs; therefore, skip step (i)
414             // of rule 5 and try to store the provided waker in the waker field.
415             set_join_waker(header, trailer, waker.clone(), snapshot)
416         };
417 
418         match res {
419             Ok(_) => return false,
420             Err(snapshot) => {
421                 assert!(snapshot.is_complete());
422             }
423         }
424     }
425     true
426 }
427 
set_join_waker( header: &Header, trailer: &Trailer, waker: Waker, snapshot: Snapshot, ) -> Result<Snapshot, Snapshot>428 fn set_join_waker(
429     header: &Header,
430     trailer: &Trailer,
431     waker: Waker,
432     snapshot: Snapshot,
433 ) -> Result<Snapshot, Snapshot> {
434     assert!(snapshot.is_join_interested());
435     assert!(!snapshot.is_join_waker_set());
436 
437     // Safety: Only the `JoinHandle` may set the `waker` field. When
438     // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
439     unsafe {
440         trailer.set_waker(Some(waker));
441     }
442 
443     // Update the `JoinWaker` state accordingly
444     let res = header.state.set_join_waker();
445 
446     // If the state could not be updated, then clear the join waker
447     if res.is_err() {
448         unsafe {
449             trailer.set_waker(None);
450         }
451     }
452 
453     res
454 }
455 
456 enum PollFuture {
457     Complete,
458     Notified,
459     Done,
460     Dealloc,
461 }
462 
463 /// Cancels the task and store the appropriate error in the stage field.
cancel_task<T: Future, S: Schedule>(core: &Core<T, S>)464 fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
465     // Drop the future from a panic guard.
466     let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
467         core.drop_future_or_output();
468     }));
469 
470     core.store_output(Err(panic_result_to_join_error(core.task_id, res)));
471 }
472 
panic_result_to_join_error( task_id: Id, res: Result<(), Box<dyn Any + Send + 'static>>, ) -> JoinError473 fn panic_result_to_join_error(
474     task_id: Id,
475     res: Result<(), Box<dyn Any + Send + 'static>>,
476 ) -> JoinError {
477     match res {
478         Ok(()) => JoinError::cancelled(task_id),
479         Err(panic) => JoinError::panic(task_id, panic),
480     }
481 }
482 
483 /// Polls the future. If the future completes, the output is written to the
484 /// stage field.
poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()>485 fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
486     // Poll the future.
487     let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
488         struct Guard<'a, T: Future, S: Schedule> {
489             core: &'a Core<T, S>,
490         }
491         impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
492             fn drop(&mut self) {
493                 // If the future panics on poll, we drop it inside the panic
494                 // guard.
495                 self.core.drop_future_or_output();
496             }
497         }
498         let guard = Guard { core };
499         let res = guard.core.poll(cx);
500         mem::forget(guard);
501         res
502     }));
503 
504     // Prepare output for being placed in the core stage.
505     let output = match output {
506         Ok(Poll::Pending) => return Poll::Pending,
507         Ok(Poll::Ready(output)) => Ok(output),
508         Err(panic) => Err(panic_to_error(&core.scheduler, core.task_id, panic)),
509     };
510 
511     // Catch and ignore panics if the future panics on drop.
512     let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
513         core.store_output(output);
514     }));
515 
516     if res.is_err() {
517         core.scheduler.unhandled_panic();
518     }
519 
520     Poll::Ready(())
521 }
522 
523 #[cold]
panic_to_error<S: Schedule>( scheduler: &S, task_id: Id, panic: Box<dyn Any + Send + 'static>, ) -> JoinError524 fn panic_to_error<S: Schedule>(
525     scheduler: &S,
526     task_id: Id,
527     panic: Box<dyn Any + Send + 'static>,
528 ) -> JoinError {
529     scheduler.unhandled_panic();
530     JoinError::panic(task_id, panic)
531 }
532