1 use crate::runtime::task::{Header, RawTask};
2 
3 use std::fmt;
4 use std::future::Future;
5 use std::marker::PhantomData;
6 use std::panic::{RefUnwindSafe, UnwindSafe};
7 use std::pin::Pin;
8 use std::task::{ready, Context, Poll, Waker};
9 
10 cfg_rt! {
11     /// An owned permission to join on a task (await its termination).
12     ///
13     /// This can be thought of as the equivalent of [`std::thread::JoinHandle`]
14     /// for a Tokio task rather than a thread. Note that the background task
15     /// associated with this `JoinHandle` started running immediately when you
16     /// called spawn, even if you have not yet awaited the `JoinHandle`.
17     ///
18     /// A `JoinHandle` *detaches* the associated task when it is dropped, which
19     /// means that there is no longer any handle to the task, and no way to `join`
20     /// on it.
21     ///
22     /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
23     /// functions.
24     ///
25     /// # Cancel safety
26     ///
27     /// The `&mut JoinHandle<T>` type is cancel safe. If it is used as the event
28     /// in a `tokio::select!` statement and some other branch completes first,
29     /// then it is guaranteed that the output of the task is not lost.
30     ///
31     /// If a `JoinHandle` is dropped, then the task continues running in the
32     /// background and its return value is lost.
33     ///
34     /// # Examples
35     ///
36     /// Creation from [`task::spawn`]:
37     ///
38     /// ```
39     /// use tokio::task;
40     ///
41     /// # async fn doc() {
42     /// let join_handle: task::JoinHandle<_> = task::spawn(async {
43     ///     // some work here
44     /// });
45     /// # }
46     /// ```
47     ///
48     /// Creation from [`task::spawn_blocking`]:
49     ///
50     /// ```
51     /// use tokio::task;
52     ///
53     /// # async fn doc() {
54     /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
55     ///     // some blocking work here
56     /// });
57     /// # }
58     /// ```
59     ///
60     /// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task.
61     /// If the return value is an `i32`, the join handle has type `JoinHandle<i32>`:
62     ///
63     /// ```
64     /// use tokio::task;
65     ///
66     /// # async fn doc() {
67     /// let join_handle: task::JoinHandle<i32> = task::spawn(async {
68     ///     5 + 3
69     /// });
70     /// # }
71     ///
72     /// ```
73     ///
74     /// If the task does not have a return value, the join handle has type `JoinHandle<()>`:
75     ///
76     /// ```
77     /// use tokio::task;
78     ///
79     /// # async fn doc() {
80     /// let join_handle: task::JoinHandle<()> = task::spawn(async {
81     ///     println!("I return nothing.");
82     /// });
83     /// # }
84     /// ```
85     ///
86     /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a
87     /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has
88     /// to be double chained to extract the returned value:
89     ///
90     /// ```
91     /// use tokio::task;
92     /// use std::io;
93     ///
94     /// #[tokio::main]
95     /// async fn main() -> io::Result<()> {
96     ///     let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
97     ///         Ok(5 + 3)
98     ///     });
99     ///
100     ///     let result = join_handle.await??;
101     ///     assert_eq!(result, 8);
102     ///     Ok(())
103     /// }
104     /// ```
105     ///
106     /// If the task panics, the error is a [`JoinError`] that contains the panic:
107     ///
108     /// ```
109     /// use tokio::task;
110     /// use std::io;
111     /// use std::panic;
112     ///
113     /// #[tokio::main]
114     /// async fn main() -> io::Result<()> {
115     ///     let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
116     ///         panic!("boom");
117     ///     });
118     ///
119     ///     let err = join_handle.await.unwrap_err();
120     ///     assert!(err.is_panic());
121     ///     Ok(())
122     /// }
123     ///
124     /// ```
125     /// Child being detached and outliving its parent:
126     ///
127     /// ```no_run
128     /// use tokio::task;
129     /// use tokio::time;
130     /// use std::time::Duration;
131     ///
132     /// # #[tokio::main] async fn main() {
133     /// let original_task = task::spawn(async {
134     ///     let _detached_task = task::spawn(async {
135     ///         // Here we sleep to make sure that the first task returns before.
136     ///         time::sleep(Duration::from_millis(10)).await;
137     ///         // This will be called, even though the JoinHandle is dropped.
138     ///         println!("♫ Still alive ♫");
139     ///     });
140     /// });
141     ///
142     /// original_task.await.expect("The task being joined has panicked");
143     /// println!("Original task is joined.");
144     ///
145     /// // We make sure that the new task has time to run, before the main
146     /// // task returns.
147     ///
148     /// time::sleep(Duration::from_millis(1000)).await;
149     /// # }
150     /// ```
151     ///
152     /// [`task::spawn`]: crate::task::spawn()
153     /// [`task::spawn_blocking`]: crate::task::spawn_blocking
154     /// [`std::thread::JoinHandle`]: std::thread::JoinHandle
155     /// [`JoinError`]: crate::task::JoinError
156     pub struct JoinHandle<T> {
157         raw: RawTask,
158         _p: PhantomData<T>,
159     }
160 }
161 
162 unsafe impl<T: Send> Send for JoinHandle<T> {}
163 unsafe impl<T: Send> Sync for JoinHandle<T> {}
164 
165 impl<T> UnwindSafe for JoinHandle<T> {}
166 impl<T> RefUnwindSafe for JoinHandle<T> {}
167 
168 impl<T> JoinHandle<T> {
new(raw: RawTask) -> JoinHandle<T>169     pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
170         JoinHandle {
171             raw,
172             _p: PhantomData,
173         }
174     }
175 
176     /// Abort the task associated with the handle.
177     ///
178     /// Awaiting a cancelled task might complete as usual if the task was
179     /// already completed at the time it was cancelled, but most likely it
180     /// will fail with a [cancelled] `JoinError`.
181     ///
182     /// Be aware that tasks spawned using [`spawn_blocking`] cannot be aborted
183     /// because they are not async. If you call `abort` on a `spawn_blocking`
184     /// task, then this *will not have any effect*, and the task will continue
185     /// running normally. The exception is if the task has not started running
186     /// yet; in that case, calling `abort` may prevent the task from starting.
187     ///
188     /// See also [the module level docs] for more information on cancellation.
189     ///
190     /// ```rust
191     /// use tokio::time;
192     ///
193     /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
194     /// # async fn main() {
195     /// let mut handles = Vec::new();
196     ///
197     /// handles.push(tokio::spawn(async {
198     ///    time::sleep(time::Duration::from_secs(10)).await;
199     ///    true
200     /// }));
201     ///
202     /// handles.push(tokio::spawn(async {
203     ///    time::sleep(time::Duration::from_secs(10)).await;
204     ///    false
205     /// }));
206     ///
207     /// for handle in &handles {
208     ///     handle.abort();
209     /// }
210     ///
211     /// for handle in handles {
212     ///     assert!(handle.await.unwrap_err().is_cancelled());
213     /// }
214     /// # }
215     /// ```
216     ///
217     /// [cancelled]: method@super::error::JoinError::is_cancelled
218     /// [the module level docs]: crate::task#cancellation
219     /// [`spawn_blocking`]: crate::task::spawn_blocking
abort(&self)220     pub fn abort(&self) {
221         self.raw.remote_abort();
222     }
223 
224     /// Checks if the task associated with this `JoinHandle` has finished.
225     ///
226     /// Please note that this method can return `false` even if [`abort`] has been
227     /// called on the task. This is because the cancellation process may take
228     /// some time, and this method does not return `true` until it has
229     /// completed.
230     ///
231     /// ```rust
232     /// use tokio::time;
233     ///
234     /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
235     /// # async fn main() {
236     /// let handle1 = tokio::spawn(async {
237     ///     // do some stuff here
238     /// });
239     /// let handle2 = tokio::spawn(async {
240     ///     // do some other stuff here
241     ///     time::sleep(time::Duration::from_secs(10)).await;
242     /// });
243     /// // Wait for the task to finish
244     /// handle2.abort();
245     /// time::sleep(time::Duration::from_secs(1)).await;
246     /// assert!(handle1.is_finished());
247     /// assert!(handle2.is_finished());
248     /// # }
249     /// ```
250     /// [`abort`]: method@JoinHandle::abort
is_finished(&self) -> bool251     pub fn is_finished(&self) -> bool {
252         let state = self.raw.header().state.load();
253         state.is_complete()
254     }
255 
256     /// Set the waker that is notified when the task completes.
set_join_waker(&mut self, waker: &Waker)257     pub(crate) fn set_join_waker(&mut self, waker: &Waker) {
258         if self.raw.try_set_join_waker(waker) {
259             // In this case the task has already completed. We wake the waker immediately.
260             waker.wake_by_ref();
261         }
262     }
263 
264     /// Returns a new `AbortHandle` that can be used to remotely abort this task.
265     ///
266     /// Awaiting a task cancelled by the `AbortHandle` might complete as usual if the task was
267     /// already completed at the time it was cancelled, but most likely it
268     /// will fail with a [cancelled] `JoinError`.
269     ///
270     /// ```rust
271     /// use tokio::{time, task};
272     ///
273     /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
274     /// # async fn main() {
275     /// let mut handles = Vec::new();
276     ///
277     /// handles.push(tokio::spawn(async {
278     ///    time::sleep(time::Duration::from_secs(10)).await;
279     ///    true
280     /// }));
281     ///
282     /// handles.push(tokio::spawn(async {
283     ///    time::sleep(time::Duration::from_secs(10)).await;
284     ///    false
285     /// }));
286     ///
287     /// let abort_handles: Vec<task::AbortHandle> = handles.iter().map(|h| h.abort_handle()).collect();
288     ///
289     /// for handle in abort_handles {
290     ///     handle.abort();
291     /// }
292     ///
293     /// for handle in handles {
294     ///     assert!(handle.await.unwrap_err().is_cancelled());
295     /// }
296     /// # }
297     /// ```
298     /// [cancelled]: method@super::error::JoinError::is_cancelled
299     #[must_use = "abort handles do nothing unless `.abort` is called"]
abort_handle(&self) -> super::AbortHandle300     pub fn abort_handle(&self) -> super::AbortHandle {
301         self.raw.ref_inc();
302         super::AbortHandle::new(self.raw)
303     }
304 
305     /// Returns a [task ID] that uniquely identifies this task relative to other
306     /// currently spawned tasks.
307     ///
308     /// [task ID]: crate::task::Id
id(&self) -> super::Id309     pub fn id(&self) -> super::Id {
310         // Safety: The header pointer is valid.
311         unsafe { Header::get_id(self.raw.header_ptr()) }
312     }
313 }
314 
315 impl<T> Unpin for JoinHandle<T> {}
316 
317 impl<T> Future for JoinHandle<T> {
318     type Output = super::Result<T>;
319 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>320     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
321         ready!(crate::trace::trace_leaf(cx));
322         let mut ret = Poll::Pending;
323 
324         // Keep track of task budget
325         let coop = ready!(crate::runtime::coop::poll_proceed(cx));
326 
327         // Try to read the task output. If the task is not yet complete, the
328         // waker is stored and is notified once the task does complete.
329         //
330         // The function must go via the vtable, which requires erasing generic
331         // types. To do this, the function "return" is placed on the stack
332         // **before** calling the function and is passed into the function using
333         // `*mut ()`.
334         //
335         // Safety:
336         //
337         // The type of `T` must match the task's output type.
338         unsafe {
339             self.raw
340                 .try_read_output(&mut ret as *mut _ as *mut (), cx.waker());
341         }
342 
343         if ret.is_ready() {
344             coop.made_progress();
345         }
346 
347         ret
348     }
349 }
350 
351 impl<T> Drop for JoinHandle<T> {
drop(&mut self)352     fn drop(&mut self) {
353         if self.raw.state().drop_join_handle_fast().is_ok() {
354             return;
355         }
356 
357         self.raw.drop_join_handle_slow();
358     }
359 }
360 
361 impl<T> fmt::Debug for JoinHandle<T>
362 where
363     T: fmt::Debug,
364 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result365     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
366         // Safety: The header pointer is valid.
367         let id_ptr = unsafe { Header::get_id_ptr(self.raw.header_ptr()) };
368         let id = unsafe { id_ptr.as_ref() };
369         fmt.debug_struct("JoinHandle").field("id", id).finish()
370     }
371 }
372