1 macro_rules! doc {
2     ($try_join:item) => {
3         /// Waits on multiple concurrent branches, returning when **all** branches
4         /// complete with `Ok(_)` or on the first `Err(_)`.
5         ///
6         /// The `try_join!` macro must be used inside of async functions, closures, and
7         /// blocks.
8         ///
9         /// Similar to [`join!`], the `try_join!` macro takes a list of async
10         /// expressions and evaluates them concurrently on the same task. Each async
11         /// expression evaluates to a future and the futures from each expression are
12         /// multiplexed on the current task. The `try_join!` macro returns when **all**
13         /// branches return with `Ok` or when the **first** branch returns with `Err`.
14         ///
15         /// [`join!`]: macro@join
16         ///
17         /// # Notes
18         ///
19         /// The supplied futures are stored inline and do not require allocating a
20         /// `Vec`.
21         ///
22         /// ### Runtime characteristics
23         ///
24         /// By running all async expressions on the current task, the expressions are
25         /// able to run **concurrently** but not in **parallel**. This means all
26         /// expressions are run on the same thread and if one branch blocks the thread,
27         /// all other expressions will be unable to continue. If parallelism is
28         /// required, spawn each async expression using [`tokio::spawn`] and pass the
29         /// join handle to `try_join!`.
30         ///
31         /// [`tokio::spawn`]: crate::spawn
32         ///
33         /// # Examples
34         ///
35         /// Basic `try_join` with two branches.
36         ///
37         /// ```
38         /// async fn do_stuff_async() -> Result<(), &'static str> {
39         ///     // async work
40         /// # Ok(())
41         /// }
42         ///
43         /// async fn more_async_work() -> Result<(), &'static str> {
44         ///     // more here
45         /// # Ok(())
46         /// }
47         ///
48         /// #[tokio::main]
49         /// async fn main() {
50         ///     let res = tokio::try_join!(
51         ///         do_stuff_async(),
52         ///         more_async_work());
53         ///
54         ///     match res {
55         ///          Ok((first, second)) => {
56         ///              // do something with the values
57         ///          }
58         ///          Err(err) => {
59         ///             println!("processing failed; error = {}", err);
60         ///          }
61         ///     }
62         /// }
63         /// ```
64         ///
65         /// Using `try_join!` with spawned tasks.
66         ///
67         /// ```
68         /// use tokio::task::JoinHandle;
69         ///
70         /// async fn do_stuff_async() -> Result<(), &'static str> {
71         ///     // async work
72         /// # Err("failed")
73         /// }
74         ///
75         /// async fn more_async_work() -> Result<(), &'static str> {
76         ///     // more here
77         /// # Ok(())
78         /// }
79         ///
80         /// async fn flatten<T>(handle: JoinHandle<Result<T, &'static str>>) -> Result<T, &'static str> {
81         ///     match handle.await {
82         ///         Ok(Ok(result)) => Ok(result),
83         ///         Ok(Err(err)) => Err(err),
84         ///         Err(err) => Err("handling failed"),
85         ///     }
86         /// }
87         ///
88         /// #[tokio::main]
89         /// async fn main() {
90         ///     let handle1 = tokio::spawn(do_stuff_async());
91         ///     let handle2 = tokio::spawn(more_async_work());
92         ///     match tokio::try_join!(flatten(handle1), flatten(handle2)) {
93         ///         Ok(val) => {
94         ///             // do something with the values
95         ///         }
96         ///         Err(err) => {
97         ///             println!("Failed with {}.", err);
98         ///             # assert_eq!(err, "failed");
99         ///         }
100         ///     }
101         /// }
102         /// ```
103         #[macro_export]
104         #[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
105         $try_join
106     };
107 }
108 
109 #[cfg(doc)]
110 doc! {macro_rules! try_join {
111     ($($future:expr),*) => { unimplemented!() }
112 }}
113 
114 #[cfg(not(doc))]
115 doc! {macro_rules! try_join {
116     (@ {
117         // One `_` for each branch in the `try_join!` macro. This is not used once
118         // normalization is complete.
119         ( $($count:tt)* )
120 
121         // The expression `0+1+1+ ... +1` equal to the number of branches.
122         ( $($total:tt)* )
123 
124         // Normalized try_join! branches
125         $( ( $($skip:tt)* ) $e:expr, )*
126 
127     }) => {{
128         use $crate::macros::support::{maybe_done, poll_fn, Future, Pin};
129         use $crate::macros::support::Poll::{Ready, Pending};
130 
131         // Safety: nothing must be moved out of `futures`. This is to satisfy
132         // the requirement of `Pin::new_unchecked` called below.
133         //
134         // We can't use the `pin!` macro for this because `futures` is a tuple
135         // and the standard library provides no way to pin-project to the fields
136         // of a tuple.
137         let mut futures = ( $( maybe_done($e), )* );
138 
139         // This assignment makes sure that the `poll_fn` closure only has a
140         // reference to the futures, instead of taking ownership of them. This
141         // mitigates the issue described in
142         // <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484>
143         let mut futures = &mut futures;
144 
145         // Each time the future created by poll_fn is polled, a different future will be polled first
146         // to ensure every future passed to join! gets a chance to make progress even if
147         // one of the futures consumes the whole budget.
148         //
149         // This is number of futures that will be skipped in the first loop
150         // iteration the next time.
151         let mut skip_next_time: u32 = 0;
152 
153         poll_fn(move |cx| {
154             const COUNT: u32 = $($total)*;
155 
156             let mut is_pending = false;
157 
158             let mut to_run = COUNT;
159 
160             // The number of futures that will be skipped in the first loop iteration
161             let mut skip = skip_next_time;
162 
163             skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 };
164 
165             // This loop runs twice and the first `skip` futures
166             // are not polled in the first iteration.
167             loop {
168             $(
169                 if skip == 0 {
170                     if to_run == 0 {
171                         // Every future has been polled
172                         break;
173                     }
174                     to_run -= 1;
175 
176                     // Extract the future for this branch from the tuple.
177                     let ( $($skip,)* fut, .. ) = &mut *futures;
178 
179                     // Safety: future is stored on the stack above
180                     // and never moved.
181                     let mut fut = unsafe { Pin::new_unchecked(fut) };
182 
183                     // Try polling
184                     if fut.as_mut().poll(cx).is_pending() {
185                         is_pending = true;
186                     } else if fut.as_mut().output_mut().expect("expected completed future").is_err() {
187                         return Ready(Err(fut.take_output().expect("expected completed future").err().unwrap()))
188                     }
189                 } else {
190                     // Future skipped, one less future to skip in the next iteration
191                     skip -= 1;
192                 }
193             )*
194             }
195 
196             if is_pending {
197                 Pending
198             } else {
199                 Ready(Ok(($({
200                     // Extract the future for this branch from the tuple.
201                     let ( $($skip,)* fut, .. ) = &mut futures;
202 
203                     // Safety: future is stored on the stack above
204                     // and never moved.
205                     let mut fut = unsafe { Pin::new_unchecked(fut) };
206 
207                     fut
208                         .take_output()
209                         .expect("expected completed future")
210                         .ok()
211                         .expect("expected Ok(_)")
212                 },)*)))
213             }
214         }).await
215     }};
216 
217     // ===== Normalize =====
218 
219     (@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
220       $crate::try_join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
221     };
222 
223     // ===== Entry point =====
224 
225     ( $($e:expr),+ $(,)?) => {
226         $crate::try_join!(@{ () (0) } $($e,)*)
227     };
228 
229     () => { async { Ok(()) }.await }
230 }}
231