1 use crate::stream::{Fuse, FuturesUnordered, StreamExt}; 2 use core::fmt; 3 use core::pin::Pin; 4 use futures_core::future::Future; 5 use futures_core::stream::{FusedStream, Stream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 11 pin_project! { 12 /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) 13 /// method. 14 #[must_use = "streams do nothing unless polled"] 15 pub struct BufferUnordered<St> 16 where 17 St: Stream, 18 { 19 #[pin] 20 stream: Fuse<St>, 21 in_progress_queue: FuturesUnordered<St::Item>, 22 max: usize, 23 } 24 } 25 26 impl<St> fmt::Debug for BufferUnordered<St> 27 where 28 St: Stream + fmt::Debug, 29 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 31 f.debug_struct("BufferUnordered") 32 .field("stream", &self.stream) 33 .field("in_progress_queue", &self.in_progress_queue) 34 .field("max", &self.max) 35 .finish() 36 } 37 } 38 39 impl<St> BufferUnordered<St> 40 where 41 St: Stream, 42 St::Item: Future, 43 { new(stream: St, n: usize) -> Self44 pub(super) fn new(stream: St, n: usize) -> Self { 45 Self { 46 stream: super::Fuse::new(stream), 47 in_progress_queue: FuturesUnordered::new(), 48 max: n, 49 } 50 } 51 52 delegate_access_inner!(stream, St, (.)); 53 } 54 55 impl<St> Stream for BufferUnordered<St> 56 where 57 St: Stream, 58 St::Item: Future, 59 { 60 type Item = <St::Item as Future>::Output; 61 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>62 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 63 let mut this = self.project(); 64 65 // First up, try to spawn off as many futures as possible by filling up 66 // our queue of futures. 67 while this.in_progress_queue.len() < *this.max { 68 match this.stream.as_mut().poll_next(cx) { 69 Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), 70 Poll::Ready(None) | Poll::Pending => break, 71 } 72 } 73 74 // Attempt to pull the next value from the in_progress_queue 75 match this.in_progress_queue.poll_next_unpin(cx) { 76 x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, 77 Poll::Ready(None) => {} 78 } 79 80 // If more values are still coming from the stream, we're not done yet 81 if this.stream.is_done() { 82 Poll::Ready(None) 83 } else { 84 Poll::Pending 85 } 86 } 87 size_hint(&self) -> (usize, Option<usize>)88 fn size_hint(&self) -> (usize, Option<usize>) { 89 let queue_len = self.in_progress_queue.len(); 90 let (lower, upper) = self.stream.size_hint(); 91 let lower = lower.saturating_add(queue_len); 92 let upper = match upper { 93 Some(x) => x.checked_add(queue_len), 94 None => None, 95 }; 96 (lower, upper) 97 } 98 } 99 100 impl<St> FusedStream for BufferUnordered<St> 101 where 102 St: Stream, 103 St::Item: Future, 104 { is_terminated(&self) -> bool105 fn is_terminated(&self) -> bool { 106 self.in_progress_queue.is_terminated() && self.stream.is_terminated() 107 } 108 } 109 110 // Forwarding impl of Sink from the underlying stream 111 #[cfg(feature = "sink")] 112 impl<S, Item> Sink<Item> for BufferUnordered<S> 113 where 114 S: Stream + Sink<Item>, 115 S::Item: Future, 116 { 117 type Error = S::Error; 118 119 delegate_sink!(stream, Item); 120 } 121