1 use crate::stream::{FuturesUnordered, StreamExt};
2 use core::fmt;
3 use core::mem;
4 use core::num::NonZeroUsize;
5 use core::pin::Pin;
6 use futures_core::future::{FusedFuture, Future};
7 use futures_core::stream::TryStream;
8 use futures_core::task::{Context, Poll};
9 use pin_project_lite::pin_project;
10 
11 pin_project! {
12     /// Future for the
13     /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent)
14     /// method.
15     #[must_use = "futures do nothing unless you `.await` or poll them"]
16     pub struct TryForEachConcurrent<St, Fut, F> {
17         #[pin]
18         stream: Option<St>,
19         f: F,
20         futures: FuturesUnordered<Fut>,
21         limit: Option<NonZeroUsize>,
22     }
23 }
24 
25 impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F>
26 where
27     St: fmt::Debug,
28     Fut: fmt::Debug,
29 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result30     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31         f.debug_struct("TryForEachConcurrent")
32             .field("stream", &self.stream)
33             .field("futures", &self.futures)
34             .field("limit", &self.limit)
35             .finish()
36     }
37 }
38 
39 impl<St, Fut, F> FusedFuture for TryForEachConcurrent<St, Fut, F>
40 where
41     St: TryStream,
42     F: FnMut(St::Ok) -> Fut,
43     Fut: Future<Output = Result<(), St::Error>>,
44 {
is_terminated(&self) -> bool45     fn is_terminated(&self) -> bool {
46         self.stream.is_none() && self.futures.is_empty()
47     }
48 }
49 
50 impl<St, Fut, F> TryForEachConcurrent<St, Fut, F>
51 where
52     St: TryStream,
53     F: FnMut(St::Ok) -> Fut,
54     Fut: Future<Output = Result<(), St::Error>>,
55 {
new(stream: St, limit: Option<usize>, f: F) -> Self56     pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self {
57         Self {
58             stream: Some(stream),
59             // Note: `limit` = 0 gets ignored.
60             limit: limit.and_then(NonZeroUsize::new),
61             f,
62             futures: FuturesUnordered::new(),
63         }
64     }
65 }
66 
67 impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
68 where
69     St: TryStream,
70     F: FnMut(St::Ok) -> Fut,
71     Fut: Future<Output = Result<(), St::Error>>,
72 {
73     type Output = Result<(), St::Error>;
74 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>75     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
76         let mut this = self.project();
77         loop {
78             let mut made_progress_this_iter = false;
79 
80             // Check if we've already created a number of futures greater than `limit`
81             if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) {
82                 let poll_res = match this.stream.as_mut().as_pin_mut() {
83                     Some(stream) => stream.try_poll_next(cx),
84                     None => Poll::Ready(None),
85                 };
86 
87                 let elem = match poll_res {
88                     Poll::Ready(Some(Ok(elem))) => {
89                         made_progress_this_iter = true;
90                         Some(elem)
91                     }
92                     Poll::Ready(None) => {
93                         this.stream.set(None);
94                         None
95                     }
96                     Poll::Pending => None,
97                     Poll::Ready(Some(Err(e))) => {
98                         // Empty the stream and futures so that we know
99                         // the future has completed.
100                         this.stream.set(None);
101                         drop(mem::replace(this.futures, FuturesUnordered::new()));
102                         return Poll::Ready(Err(e));
103                     }
104                 };
105 
106                 if let Some(elem) = elem {
107                     this.futures.push((this.f)(elem));
108                 }
109             }
110 
111             match this.futures.poll_next_unpin(cx) {
112                 Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true,
113                 Poll::Ready(None) => {
114                     if this.stream.is_none() {
115                         return Poll::Ready(Ok(()));
116                     }
117                 }
118                 Poll::Pending => {}
119                 Poll::Ready(Some(Err(e))) => {
120                     // Empty the stream and futures so that we know
121                     // the future has completed.
122                     this.stream.set(None);
123                     drop(mem::replace(this.futures, FuturesUnordered::new()));
124                     return Poll::Ready(Err(e));
125                 }
126             }
127 
128             if !made_progress_this_iter {
129                 return Poll::Pending;
130             }
131         }
132     }
133 }
134