1 macro_rules! doc { 2 ($try_join:item) => { 3 /// Waits on multiple concurrent branches, returning when **all** branches 4 /// complete with `Ok(_)` or on the first `Err(_)`. 5 /// 6 /// The `try_join!` macro must be used inside of async functions, closures, and 7 /// blocks. 8 /// 9 /// Similar to [`join!`], the `try_join!` macro takes a list of async 10 /// expressions and evaluates them concurrently on the same task. Each async 11 /// expression evaluates to a future and the futures from each expression are 12 /// multiplexed on the current task. The `try_join!` macro returns when **all** 13 /// branches return with `Ok` or when the **first** branch returns with `Err`. 14 /// 15 /// [`join!`]: macro@join 16 /// 17 /// # Notes 18 /// 19 /// The supplied futures are stored inline and do not require allocating a 20 /// `Vec`. 21 /// 22 /// ### Runtime characteristics 23 /// 24 /// By running all async expressions on the current task, the expressions are 25 /// able to run **concurrently** but not in **parallel**. This means all 26 /// expressions are run on the same thread and if one branch blocks the thread, 27 /// all other expressions will be unable to continue. If parallelism is 28 /// required, spawn each async expression using [`tokio::spawn`] and pass the 29 /// join handle to `try_join!`. 30 /// 31 /// [`tokio::spawn`]: crate::spawn 32 /// 33 /// # Examples 34 /// 35 /// Basic `try_join` with two branches. 36 /// 37 /// ``` 38 /// async fn do_stuff_async() -> Result<(), &'static str> { 39 /// // async work 40 /// # Ok(()) 41 /// } 42 /// 43 /// async fn more_async_work() -> Result<(), &'static str> { 44 /// // more here 45 /// # Ok(()) 46 /// } 47 /// 48 /// #[tokio::main] 49 /// async fn main() { 50 /// let res = tokio::try_join!( 51 /// do_stuff_async(), 52 /// more_async_work()); 53 /// 54 /// match res { 55 /// Ok((first, second)) => { 56 /// // do something with the values 57 /// } 58 /// Err(err) => { 59 /// println!("processing failed; error = {}", err); 60 /// } 61 /// } 62 /// } 63 /// ``` 64 /// 65 /// Using `try_join!` with spawned tasks. 66 /// 67 /// ``` 68 /// use tokio::task::JoinHandle; 69 /// 70 /// async fn do_stuff_async() -> Result<(), &'static str> { 71 /// // async work 72 /// # Err("failed") 73 /// } 74 /// 75 /// async fn more_async_work() -> Result<(), &'static str> { 76 /// // more here 77 /// # Ok(()) 78 /// } 79 /// 80 /// async fn flatten<T>(handle: JoinHandle<Result<T, &'static str>>) -> Result<T, &'static str> { 81 /// match handle.await { 82 /// Ok(Ok(result)) => Ok(result), 83 /// Ok(Err(err)) => Err(err), 84 /// Err(err) => Err("handling failed"), 85 /// } 86 /// } 87 /// 88 /// #[tokio::main] 89 /// async fn main() { 90 /// let handle1 = tokio::spawn(do_stuff_async()); 91 /// let handle2 = tokio::spawn(more_async_work()); 92 /// match tokio::try_join!(flatten(handle1), flatten(handle2)) { 93 /// Ok(val) => { 94 /// // do something with the values 95 /// } 96 /// Err(err) => { 97 /// println!("Failed with {}.", err); 98 /// # assert_eq!(err, "failed"); 99 /// } 100 /// } 101 /// } 102 /// ``` 103 #[macro_export] 104 #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] 105 $try_join 106 }; 107 } 108 109 #[cfg(doc)] 110 doc! {macro_rules! try_join { 111 ($($future:expr),*) => { unimplemented!() } 112 }} 113 114 #[cfg(not(doc))] 115 doc! {macro_rules! try_join { 116 (@ { 117 // One `_` for each branch in the `try_join!` macro. This is not used once 118 // normalization is complete. 119 ( $($count:tt)* ) 120 121 // The expression `0+1+1+ ... +1` equal to the number of branches. 122 ( $($total:tt)* ) 123 124 // Normalized try_join! branches 125 $( ( $($skip:tt)* ) $e:expr, )* 126 127 }) => {{ 128 use $crate::macros::support::{maybe_done, poll_fn, Future, Pin}; 129 use $crate::macros::support::Poll::{Ready, Pending}; 130 131 // Safety: nothing must be moved out of `futures`. This is to satisfy 132 // the requirement of `Pin::new_unchecked` called below. 133 // 134 // We can't use the `pin!` macro for this because `futures` is a tuple 135 // and the standard library provides no way to pin-project to the fields 136 // of a tuple. 137 let mut futures = ( $( maybe_done($e), )* ); 138 139 // This assignment makes sure that the `poll_fn` closure only has a 140 // reference to the futures, instead of taking ownership of them. This 141 // mitigates the issue described in 142 // <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484> 143 let mut futures = &mut futures; 144 145 // Each time the future created by poll_fn is polled, a different future will be polled first 146 // to ensure every future passed to join! gets a chance to make progress even if 147 // one of the futures consumes the whole budget. 148 // 149 // This is number of futures that will be skipped in the first loop 150 // iteration the next time. 151 let mut skip_next_time: u32 = 0; 152 153 poll_fn(move |cx| { 154 const COUNT: u32 = $($total)*; 155 156 let mut is_pending = false; 157 158 let mut to_run = COUNT; 159 160 // The number of futures that will be skipped in the first loop iteration 161 let mut skip = skip_next_time; 162 163 skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 }; 164 165 // This loop runs twice and the first `skip` futures 166 // are not polled in the first iteration. 167 loop { 168 $( 169 if skip == 0 { 170 if to_run == 0 { 171 // Every future has been polled 172 break; 173 } 174 to_run -= 1; 175 176 // Extract the future for this branch from the tuple. 177 let ( $($skip,)* fut, .. ) = &mut *futures; 178 179 // Safety: future is stored on the stack above 180 // and never moved. 181 let mut fut = unsafe { Pin::new_unchecked(fut) }; 182 183 // Try polling 184 if fut.as_mut().poll(cx).is_pending() { 185 is_pending = true; 186 } else if fut.as_mut().output_mut().expect("expected completed future").is_err() { 187 return Ready(Err(fut.take_output().expect("expected completed future").err().unwrap())) 188 } 189 } else { 190 // Future skipped, one less future to skip in the next iteration 191 skip -= 1; 192 } 193 )* 194 } 195 196 if is_pending { 197 Pending 198 } else { 199 Ready(Ok(($({ 200 // Extract the future for this branch from the tuple. 201 let ( $($skip,)* fut, .. ) = &mut futures; 202 203 // Safety: future is stored on the stack above 204 // and never moved. 205 let mut fut = unsafe { Pin::new_unchecked(fut) }; 206 207 fut 208 .take_output() 209 .expect("expected completed future") 210 .ok() 211 .expect("expected Ok(_)") 212 },)*))) 213 } 214 }).await 215 }}; 216 217 // ===== Normalize ===== 218 219 (@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => { 220 $crate::try_join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*) 221 }; 222 223 // ===== Entry point ===== 224 225 ( $($e:expr),+ $(,)?) => { 226 $crate::try_join!(@{ () (0) } $($e,)*) 227 }; 228 229 () => { async { Ok(()) }.await } 230 }} 231