1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::future::{FusedFuture, Future};
4 use futures_core::ready;
5 use futures_core::stream::{FusedStream, Stream};
6 use futures_core::task::{Context, Poll};
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Future for the [`for_each`](super::StreamExt::for_each) method.
11     #[must_use = "futures do nothing unless you `.await` or poll them"]
12     pub struct ForEach<St, Fut, F> {
13         #[pin]
14         stream: St,
15         f: F,
16         #[pin]
17         future: Option<Fut>,
18     }
19 }
20 
21 impl<St, Fut, F> fmt::Debug for ForEach<St, Fut, F>
22 where
23     St: fmt::Debug,
24     Fut: fmt::Debug,
25 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result26     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27         f.debug_struct("ForEach")
28             .field("stream", &self.stream)
29             .field("future", &self.future)
30             .finish()
31     }
32 }
33 
34 impl<St, Fut, F> ForEach<St, Fut, F>
35 where
36     St: Stream,
37     F: FnMut(St::Item) -> Fut,
38     Fut: Future<Output = ()>,
39 {
new(stream: St, f: F) -> Self40     pub(super) fn new(stream: St, f: F) -> Self {
41         Self { stream, f, future: None }
42     }
43 }
44 
45 impl<St, Fut, F> FusedFuture for ForEach<St, Fut, F>
46 where
47     St: FusedStream,
48     F: FnMut(St::Item) -> Fut,
49     Fut: Future<Output = ()>,
50 {
is_terminated(&self) -> bool51     fn is_terminated(&self) -> bool {
52         self.future.is_none() && self.stream.is_terminated()
53     }
54 }
55 
56 impl<St, Fut, F> Future for ForEach<St, Fut, F>
57 where
58     St: Stream,
59     F: FnMut(St::Item) -> Fut,
60     Fut: Future<Output = ()>,
61 {
62     type Output = ();
63 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>64     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
65         let mut this = self.project();
66         loop {
67             if let Some(fut) = this.future.as_mut().as_pin_mut() {
68                 ready!(fut.poll(cx));
69                 this.future.set(None);
70             } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
71                 this.future.set(Some((this.f)(item)));
72             } else {
73                 break;
74             }
75         }
76         Poll::Ready(())
77     }
78 }
79