1 use crate::stream::{FuturesUnordered, StreamExt};
2 use core::fmt;
3 use core::num::NonZeroUsize;
4 use core::pin::Pin;
5 use futures_core::future::{FusedFuture, Future};
6 use futures_core::stream::Stream;
7 use futures_core::task::{Context, Poll};
8 use pin_project_lite::pin_project;
9 
10 pin_project! {
11     /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent)
12     /// method.
13     #[must_use = "futures do nothing unless you `.await` or poll them"]
14     pub struct ForEachConcurrent<St, Fut, F> {
15         #[pin]
16         stream: Option<St>,
17         f: F,
18         futures: FuturesUnordered<Fut>,
19         limit: Option<NonZeroUsize>,
20     }
21 }
22 
23 impl<St, Fut, F> fmt::Debug for ForEachConcurrent<St, Fut, F>
24 where
25     St: fmt::Debug,
26     Fut: fmt::Debug,
27 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29         f.debug_struct("ForEachConcurrent")
30             .field("stream", &self.stream)
31             .field("futures", &self.futures)
32             .field("limit", &self.limit)
33             .finish()
34     }
35 }
36 
37 impl<St, Fut, F> ForEachConcurrent<St, Fut, F>
38 where
39     St: Stream,
40     F: FnMut(St::Item) -> Fut,
41     Fut: Future<Output = ()>,
42 {
new(stream: St, limit: Option<usize>, f: F) -> Self43     pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self {
44         Self {
45             stream: Some(stream),
46             // Note: `limit` = 0 gets ignored.
47             limit: limit.and_then(NonZeroUsize::new),
48             f,
49             futures: FuturesUnordered::new(),
50         }
51     }
52 }
53 
54 impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F>
55 where
56     St: Stream,
57     F: FnMut(St::Item) -> Fut,
58     Fut: Future<Output = ()>,
59 {
is_terminated(&self) -> bool60     fn is_terminated(&self) -> bool {
61         self.stream.is_none() && self.futures.is_empty()
62     }
63 }
64 
65 impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
66 where
67     St: Stream,
68     F: FnMut(St::Item) -> Fut,
69     Fut: Future<Output = ()>,
70 {
71     type Output = ();
72 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>73     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
74         let mut this = self.project();
75         loop {
76             let mut made_progress_this_iter = false;
77 
78             // Check if we've already created a number of futures greater than `limit`
79             if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) {
80                 let mut stream_completed = false;
81                 let elem = if let Some(stream) = this.stream.as_mut().as_pin_mut() {
82                     match stream.poll_next(cx) {
83                         Poll::Ready(Some(elem)) => {
84                             made_progress_this_iter = true;
85                             Some(elem)
86                         }
87                         Poll::Ready(None) => {
88                             stream_completed = true;
89                             None
90                         }
91                         Poll::Pending => None,
92                     }
93                 } else {
94                     None
95                 };
96                 if stream_completed {
97                     this.stream.set(None);
98                 }
99                 if let Some(elem) = elem {
100                     this.futures.push((this.f)(elem));
101                 }
102             }
103 
104             match this.futures.poll_next_unpin(cx) {
105                 Poll::Ready(Some(())) => made_progress_this_iter = true,
106                 Poll::Ready(None) => {
107                     if this.stream.is_none() {
108                         return Poll::Ready(());
109                     }
110                 }
111                 Poll::Pending => {}
112             }
113 
114             if !made_progress_this_iter {
115                 return Poll::Pending;
116             }
117         }
118     }
119 }
120