1 use crate::stream::{FuturesUnordered, StreamExt}; 2 use core::fmt; 3 use core::mem; 4 use core::num::NonZeroUsize; 5 use core::pin::Pin; 6 use futures_core::future::{FusedFuture, Future}; 7 use futures_core::stream::TryStream; 8 use futures_core::task::{Context, Poll}; 9 use pin_project_lite::pin_project; 10 11 pin_project! { 12 /// Future for the 13 /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent) 14 /// method. 15 #[must_use = "futures do nothing unless you `.await` or poll them"] 16 pub struct TryForEachConcurrent<St, Fut, F> { 17 #[pin] 18 stream: Option<St>, 19 f: F, 20 futures: FuturesUnordered<Fut>, 21 limit: Option<NonZeroUsize>, 22 } 23 } 24 25 impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F> 26 where 27 St: fmt::Debug, 28 Fut: fmt::Debug, 29 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 31 f.debug_struct("TryForEachConcurrent") 32 .field("stream", &self.stream) 33 .field("futures", &self.futures) 34 .field("limit", &self.limit) 35 .finish() 36 } 37 } 38 39 impl<St, Fut, F> FusedFuture for TryForEachConcurrent<St, Fut, F> 40 where 41 St: TryStream, 42 F: FnMut(St::Ok) -> Fut, 43 Fut: Future<Output = Result<(), St::Error>>, 44 { is_terminated(&self) -> bool45 fn is_terminated(&self) -> bool { 46 self.stream.is_none() && self.futures.is_empty() 47 } 48 } 49 50 impl<St, Fut, F> TryForEachConcurrent<St, Fut, F> 51 where 52 St: TryStream, 53 F: FnMut(St::Ok) -> Fut, 54 Fut: Future<Output = Result<(), St::Error>>, 55 { new(stream: St, limit: Option<usize>, f: F) -> Self56 pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self { 57 Self { 58 stream: Some(stream), 59 // Note: `limit` = 0 gets ignored. 60 limit: limit.and_then(NonZeroUsize::new), 61 f, 62 futures: FuturesUnordered::new(), 63 } 64 } 65 } 66 67 impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> 68 where 69 St: TryStream, 70 F: FnMut(St::Ok) -> Fut, 71 Fut: Future<Output = Result<(), St::Error>>, 72 { 73 type Output = Result<(), St::Error>; 74 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>75 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 76 let mut this = self.project(); 77 loop { 78 let mut made_progress_this_iter = false; 79 80 // Check if we've already created a number of futures greater than `limit` 81 if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) { 82 let poll_res = match this.stream.as_mut().as_pin_mut() { 83 Some(stream) => stream.try_poll_next(cx), 84 None => Poll::Ready(None), 85 }; 86 87 let elem = match poll_res { 88 Poll::Ready(Some(Ok(elem))) => { 89 made_progress_this_iter = true; 90 Some(elem) 91 } 92 Poll::Ready(None) => { 93 this.stream.set(None); 94 None 95 } 96 Poll::Pending => None, 97 Poll::Ready(Some(Err(e))) => { 98 // Empty the stream and futures so that we know 99 // the future has completed. 100 this.stream.set(None); 101 drop(mem::replace(this.futures, FuturesUnordered::new())); 102 return Poll::Ready(Err(e)); 103 } 104 }; 105 106 if let Some(elem) = elem { 107 this.futures.push((this.f)(elem)); 108 } 109 } 110 111 match this.futures.poll_next_unpin(cx) { 112 Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true, 113 Poll::Ready(None) => { 114 if this.stream.is_none() { 115 return Poll::Ready(Ok(())); 116 } 117 } 118 Poll::Pending => {} 119 Poll::Ready(Some(Err(e))) => { 120 // Empty the stream and futures so that we know 121 // the future has completed. 122 this.stream.set(None); 123 drop(mem::replace(this.futures, FuturesUnordered::new())); 124 return Poll::Ready(Err(e)); 125 } 126 } 127 128 if !made_progress_this_iter { 129 return Poll::Pending; 130 } 131 } 132 } 133 } 134