1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future}; 4 use futures_core::ready; 5 use futures_core::stream::TryStream; 6 use futures_core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Future for the [`try_all`](super::TryStreamExt::try_all) method. 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct TryAll<St, Fut, F> { 13 #[pin] 14 stream: St, 15 f: F, 16 done: bool, 17 #[pin] 18 future: Option<Fut>, 19 } 20 } 21 22 impl<St, Fut, F> fmt::Debug for TryAll<St, Fut, F> 23 where 24 St: fmt::Debug, 25 Fut: fmt::Debug, 26 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 28 f.debug_struct("TryAll") 29 .field("stream", &self.stream) 30 .field("done", &self.done) 31 .field("future", &self.future) 32 .finish() 33 } 34 } 35 36 impl<St, Fut, F> TryAll<St, Fut, F> 37 where 38 St: TryStream, 39 F: FnMut(St::Ok) -> Fut, 40 Fut: Future<Output = bool>, 41 { new(stream: St, f: F) -> Self42 pub(super) fn new(stream: St, f: F) -> Self { 43 Self { stream, f, done: false, future: None } 44 } 45 } 46 47 impl<St, Fut, F> FusedFuture for TryAll<St, Fut, F> 48 where 49 St: TryStream, 50 F: FnMut(St::Ok) -> Fut, 51 Fut: Future<Output = bool>, 52 { is_terminated(&self) -> bool53 fn is_terminated(&self) -> bool { 54 self.done && self.future.is_none() 55 } 56 } 57 58 impl<St, Fut, F> Future for TryAll<St, Fut, F> 59 where 60 St: TryStream, 61 F: FnMut(St::Ok) -> Fut, 62 Fut: Future<Output = bool>, 63 { 64 type Output = Result<bool, St::Error>; 65 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<bool, St::Error>>66 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<bool, St::Error>> { 67 let mut this = self.project(); 68 69 Poll::Ready(loop { 70 if let Some(fut) = this.future.as_mut().as_pin_mut() { 71 // we're currently processing a future to produce a new value 72 let acc = ready!(fut.poll(cx)); 73 this.future.set(None); 74 if !acc { 75 *this.done = true; 76 break Ok(false); 77 } // early exit 78 } else if !*this.done { 79 // we're waiting on a new item from the stream 80 match ready!(this.stream.as_mut().try_poll_next(cx)) { 81 Some(Ok(item)) => { 82 this.future.set(Some((this.f)(item))); 83 } 84 Some(Err(err)) => { 85 *this.done = true; 86 break Err(err); 87 } 88 None => { 89 *this.done = true; 90 break Ok(true); 91 } 92 } 93 } else { 94 panic!("TryAll polled after completion") 95 } 96 }) 97 } 98 } 99