1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future}; 4 use futures_core::ready; 5 use futures_core::stream::Stream; 6 use futures_core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Future for the [`all`](super::StreamExt::all) method. 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct All<St, Fut, F> { 13 #[pin] 14 stream: St, 15 f: F, 16 done: bool, 17 #[pin] 18 future: Option<Fut>, 19 } 20 } 21 22 impl<St, Fut, F> fmt::Debug for All<St, Fut, F> 23 where 24 St: fmt::Debug, 25 Fut: fmt::Debug, 26 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 28 f.debug_struct("All") 29 .field("stream", &self.stream) 30 .field("done", &self.done) 31 .field("future", &self.future) 32 .finish() 33 } 34 } 35 36 impl<St, Fut, F> All<St, Fut, F> 37 where 38 St: Stream, 39 F: FnMut(St::Item) -> Fut, 40 Fut: Future<Output = bool>, 41 { new(stream: St, f: F) -> Self42 pub(super) fn new(stream: St, f: F) -> Self { 43 Self { stream, f, done: false, future: None } 44 } 45 } 46 47 impl<St, Fut, F> FusedFuture for All<St, Fut, F> 48 where 49 St: Stream, 50 F: FnMut(St::Item) -> Fut, 51 Fut: Future<Output = bool>, 52 { is_terminated(&self) -> bool53 fn is_terminated(&self) -> bool { 54 self.done && self.future.is_none() 55 } 56 } 57 58 impl<St, Fut, F> Future for All<St, Fut, F> 59 where 60 St: Stream, 61 F: FnMut(St::Item) -> Fut, 62 Fut: Future<Output = bool>, 63 { 64 type Output = bool; 65 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool>66 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> { 67 let mut this = self.project(); 68 Poll::Ready(loop { 69 if let Some(fut) = this.future.as_mut().as_pin_mut() { 70 // we're currently processing a future to produce a new value 71 let res = ready!(fut.poll(cx)); 72 this.future.set(None); 73 if !res { 74 *this.done = true; 75 break false; 76 } // early exit 77 } else if !*this.done { 78 // we're waiting on a new item from the stream 79 match ready!(this.stream.as_mut().poll_next(cx)) { 80 Some(item) => { 81 this.future.set(Some((this.f)(item))); 82 } 83 None => { 84 *this.done = true; 85 break true; 86 } 87 } 88 } else { 89 panic!("All polled after completion") 90 } 91 }) 92 } 93 } 94