1 use {
2     crate::future::{CatchUnwind, FutureExt},
3     futures_channel::oneshot::{self, Receiver, Sender},
4     futures_core::{
5         future::Future,
6         ready,
7         task::{Context, Poll},
8     },
9     pin_project_lite::pin_project,
10     std::{
11         any::Any,
12         boxed::Box,
13         fmt,
14         panic::{self, AssertUnwindSafe},
15         pin::Pin,
16         sync::{
17             atomic::{AtomicBool, Ordering},
18             Arc,
19         },
20         thread,
21     },
22 };
23 
24 /// The handle to a remote future returned by
25 /// [`remote_handle`](crate::future::FutureExt::remote_handle). When you drop this,
26 /// the remote future will be woken up to be dropped by the executor.
27 ///
28 /// ## Unwind safety
29 ///
30 /// When the remote future panics, [Remote] will catch the unwind and transfer it to
31 /// the thread where `RemoteHandle` is being awaited. This is good for the common
32 /// case where [Remote] is spawned on a threadpool. It is unlikely that other code
33 /// in the executor working thread shares mutable data with the spawned future and we
34 /// preserve the executor from losing its working threads.
35 ///
36 /// If you run the future locally and send the handle of to be awaited elsewhere, you
37 /// must be careful with regard to unwind safety because the thread in which the future
38 /// is polled will keep running after the panic and the thread running the [RemoteHandle]
39 /// will unwind.
40 #[must_use = "dropping a remote handle cancels the underlying future"]
41 #[derive(Debug)]
42 #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
43 pub struct RemoteHandle<T> {
44     rx: Receiver<thread::Result<T>>,
45     keep_running: Arc<AtomicBool>,
46 }
47 
48 impl<T> RemoteHandle<T> {
49     /// Drops this handle *without* canceling the underlying future.
50     ///
51     /// This method can be used if you want to drop the handle, but let the
52     /// execution continue.
forget(self)53     pub fn forget(self) {
54         self.keep_running.store(true, Ordering::SeqCst);
55     }
56 }
57 
58 impl<T: 'static> Future for RemoteHandle<T> {
59     type Output = T;
60 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T>61     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
62         match ready!(self.rx.poll_unpin(cx)) {
63             Ok(Ok(output)) => Poll::Ready(output),
64             // the remote future panicked.
65             Ok(Err(e)) => panic::resume_unwind(e),
66             // The oneshot sender was dropped.
67             Err(e) => panic::resume_unwind(Box::new(e)),
68         }
69     }
70 }
71 
72 type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'static)>>;
73 
74 pin_project! {
75     /// A future which sends its output to the corresponding `RemoteHandle`.
76     /// Created by [`remote_handle`](crate::future::FutureExt::remote_handle).
77     #[must_use = "futures do nothing unless you `.await` or poll them"]
78     #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
79     pub struct Remote<Fut: Future> {
80         tx: Option<Sender<SendMsg<Fut>>>,
81         keep_running: Arc<AtomicBool>,
82         #[pin]
83         future: CatchUnwind<AssertUnwindSafe<Fut>>,
84     }
85 }
86 
87 impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result88     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89         f.debug_tuple("Remote").field(&self.future).finish()
90     }
91 }
92 
93 impl<Fut: Future> Future for Remote<Fut> {
94     type Output = ();
95 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>96     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
97         let this = self.project();
98 
99         if this.tx.as_mut().unwrap().poll_canceled(cx).is_ready()
100             && !this.keep_running.load(Ordering::SeqCst)
101         {
102             // Cancelled, bail out
103             return Poll::Ready(());
104         }
105 
106         let output = ready!(this.future.poll(cx));
107 
108         // if the receiving end has gone away then that's ok, we just ignore the
109         // send error here.
110         drop(this.tx.take().unwrap().send(output));
111         Poll::Ready(())
112     }
113 }
114 
remote_handle<Fut: Future>(future: Fut) -> (Remote<Fut>, RemoteHandle<Fut::Output>)115 pub(super) fn remote_handle<Fut: Future>(future: Fut) -> (Remote<Fut>, RemoteHandle<Fut::Output>) {
116     let (tx, rx) = oneshot::channel();
117     let keep_running = Arc::new(AtomicBool::new(false));
118 
119     // Unwind Safety: See the docs for RemoteHandle.
120     let wrapped = Remote {
121         future: AssertUnwindSafe(future).catch_unwind(),
122         tx: Some(tx),
123         keep_running: keep_running.clone(),
124     };
125 
126     (wrapped, RemoteHandle { rx, keep_running })
127 }
128