1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future}; 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream}; 6 use futures_core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Future for the [`for_each`](super::StreamExt::for_each) method. 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct ForEach<St, Fut, F> { 13 #[pin] 14 stream: St, 15 f: F, 16 #[pin] 17 future: Option<Fut>, 18 } 19 } 20 21 impl<St, Fut, F> fmt::Debug for ForEach<St, Fut, F> 22 where 23 St: fmt::Debug, 24 Fut: fmt::Debug, 25 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 27 f.debug_struct("ForEach") 28 .field("stream", &self.stream) 29 .field("future", &self.future) 30 .finish() 31 } 32 } 33 34 impl<St, Fut, F> ForEach<St, Fut, F> 35 where 36 St: Stream, 37 F: FnMut(St::Item) -> Fut, 38 Fut: Future<Output = ()>, 39 { new(stream: St, f: F) -> Self40 pub(super) fn new(stream: St, f: F) -> Self { 41 Self { stream, f, future: None } 42 } 43 } 44 45 impl<St, Fut, F> FusedFuture for ForEach<St, Fut, F> 46 where 47 St: FusedStream, 48 F: FnMut(St::Item) -> Fut, 49 Fut: Future<Output = ()>, 50 { is_terminated(&self) -> bool51 fn is_terminated(&self) -> bool { 52 self.future.is_none() && self.stream.is_terminated() 53 } 54 } 55 56 impl<St, Fut, F> Future for ForEach<St, Fut, F> 57 where 58 St: Stream, 59 F: FnMut(St::Item) -> Fut, 60 Fut: Future<Output = ()>, 61 { 62 type Output = (); 63 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>64 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { 65 let mut this = self.project(); 66 loop { 67 if let Some(fut) = this.future.as_mut().as_pin_mut() { 68 ready!(fut.poll(cx)); 69 this.future.set(None); 70 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { 71 this.future.set(Some((this.f)(item))); 72 } else { 73 break; 74 } 75 } 76 Poll::Ready(()) 77 } 78 } 79