1 use crate::stream::{FuturesUnordered, StreamExt}; 2 use core::fmt; 3 use core::num::NonZeroUsize; 4 use core::pin::Pin; 5 use futures_core::future::{FusedFuture, Future}; 6 use futures_core::stream::Stream; 7 use futures_core::task::{Context, Poll}; 8 use pin_project_lite::pin_project; 9 10 pin_project! { 11 /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent) 12 /// method. 13 #[must_use = "futures do nothing unless you `.await` or poll them"] 14 pub struct ForEachConcurrent<St, Fut, F> { 15 #[pin] 16 stream: Option<St>, 17 f: F, 18 futures: FuturesUnordered<Fut>, 19 limit: Option<NonZeroUsize>, 20 } 21 } 22 23 impl<St, Fut, F> fmt::Debug for ForEachConcurrent<St, Fut, F> 24 where 25 St: fmt::Debug, 26 Fut: fmt::Debug, 27 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 29 f.debug_struct("ForEachConcurrent") 30 .field("stream", &self.stream) 31 .field("futures", &self.futures) 32 .field("limit", &self.limit) 33 .finish() 34 } 35 } 36 37 impl<St, Fut, F> ForEachConcurrent<St, Fut, F> 38 where 39 St: Stream, 40 F: FnMut(St::Item) -> Fut, 41 Fut: Future<Output = ()>, 42 { new(stream: St, limit: Option<usize>, f: F) -> Self43 pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self { 44 Self { 45 stream: Some(stream), 46 // Note: `limit` = 0 gets ignored. 47 limit: limit.and_then(NonZeroUsize::new), 48 f, 49 futures: FuturesUnordered::new(), 50 } 51 } 52 } 53 54 impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F> 55 where 56 St: Stream, 57 F: FnMut(St::Item) -> Fut, 58 Fut: Future<Output = ()>, 59 { is_terminated(&self) -> bool60 fn is_terminated(&self) -> bool { 61 self.stream.is_none() && self.futures.is_empty() 62 } 63 } 64 65 impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> 66 where 67 St: Stream, 68 F: FnMut(St::Item) -> Fut, 69 Fut: Future<Output = ()>, 70 { 71 type Output = (); 72 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>73 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { 74 let mut this = self.project(); 75 loop { 76 let mut made_progress_this_iter = false; 77 78 // Check if we've already created a number of futures greater than `limit` 79 if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) { 80 let mut stream_completed = false; 81 let elem = if let Some(stream) = this.stream.as_mut().as_pin_mut() { 82 match stream.poll_next(cx) { 83 Poll::Ready(Some(elem)) => { 84 made_progress_this_iter = true; 85 Some(elem) 86 } 87 Poll::Ready(None) => { 88 stream_completed = true; 89 None 90 } 91 Poll::Pending => None, 92 } 93 } else { 94 None 95 }; 96 if stream_completed { 97 this.stream.set(None); 98 } 99 if let Some(elem) = elem { 100 this.futures.push((this.f)(elem)); 101 } 102 } 103 104 match this.futures.poll_next_unpin(cx) { 105 Poll::Ready(Some(())) => made_progress_this_iter = true, 106 Poll::Ready(None) => { 107 if this.stream.is_none() { 108 return Poll::Ready(()); 109 } 110 } 111 Poll::Pending => {} 112 } 113 114 if !made_progress_this_iter { 115 return Poll::Pending; 116 } 117 } 118 } 119 } 120