1 use crate::stream::{FuturesUnordered, StreamExt};
2 use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
3 use core::cmp::Ordering;
4 use core::fmt::{self, Debug};
5 use core::iter::FromIterator;
6 use core::pin::Pin;
7 use futures_core::future::Future;
8 use futures_core::ready;
9 use futures_core::stream::Stream;
10 use futures_core::{
11     task::{Context, Poll},
12     FusedStream,
13 };
14 use pin_project_lite::pin_project;
15 
16 pin_project! {
17     #[must_use = "futures do nothing unless you `.await` or poll them"]
18     #[derive(Debug)]
19     struct OrderWrapper<T> {
20         #[pin]
21         data: T, // A future or a future's output
22         // Use i64 for index since isize may overflow in 32-bit targets.
23         index: i64,
24     }
25 }
26 
27 impl<T> PartialEq for OrderWrapper<T> {
eq(&self, other: &Self) -> bool28     fn eq(&self, other: &Self) -> bool {
29         self.index == other.index
30     }
31 }
32 
33 impl<T> Eq for OrderWrapper<T> {}
34 
35 impl<T> PartialOrd for OrderWrapper<T> {
partial_cmp(&self, other: &Self) -> Option<Ordering>36     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
37         Some(self.cmp(other))
38     }
39 }
40 
41 impl<T> Ord for OrderWrapper<T> {
cmp(&self, other: &Self) -> Ordering42     fn cmp(&self, other: &Self) -> Ordering {
43         // BinaryHeap is a max heap, so compare backwards here.
44         other.index.cmp(&self.index)
45     }
46 }
47 
48 impl<T> Future for OrderWrapper<T>
49 where
50     T: Future,
51 {
52     type Output = OrderWrapper<T::Output>;
53 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>54     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
55         let index = self.index;
56         self.project().data.poll(cx).map(|output| OrderWrapper { data: output, index })
57     }
58 }
59 
60 /// An unbounded queue of futures.
61 ///
62 /// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO
63 /// order on top of the set of futures. While futures in the set will race to
64 /// completion in parallel, results will only be returned in the order their
65 /// originating futures were added to the queue.
66 ///
67 /// Futures are pushed into this queue and their realized values are yielded in
68 /// order. This structure is optimized to manage a large number of futures.
69 /// Futures managed by [`FuturesOrdered`] will only be polled when they generate
70 /// notifications. This reduces the required amount of work needed to coordinate
71 /// large numbers of futures.
72 ///
73 /// When a [`FuturesOrdered`] is first created, it does not contain any futures.
74 /// Calling [`poll_next`](FuturesOrdered::poll_next) in this state will result
75 /// in [`Poll::Ready(None)`](Poll::Ready) to be returned. Futures are submitted
76 /// to the queue using [`push_back`](FuturesOrdered::push_back) (or
77 /// [`push_front`](FuturesOrdered::push_front)); however, the future will
78 /// **not** be polled at this point. [`FuturesOrdered`] will only poll managed
79 /// futures when [`FuturesOrdered::poll_next`] is called. As such, it
80 /// is important to call [`poll_next`](FuturesOrdered::poll_next) after pushing
81 /// new futures.
82 ///
83 /// If [`FuturesOrdered::poll_next`] returns [`Poll::Ready(None)`](Poll::Ready)
84 /// this means that the queue is currently not managing any futures. A future
85 /// may be submitted to the queue at a later time. At that point, a call to
86 /// [`FuturesOrdered::poll_next`] will either return the future's resolved value
87 /// **or** [`Poll::Pending`] if the future has not yet completed. When
88 /// multiple futures are submitted to the queue, [`FuturesOrdered::poll_next`]
89 /// will return [`Poll::Pending`] until the first future completes, even if
90 /// some of the later futures have already completed.
91 ///
92 /// Note that you can create a ready-made [`FuturesOrdered`] via the
93 /// [`collect`](Iterator::collect) method, or you can start with an empty queue
94 /// with the [`FuturesOrdered::new`] constructor.
95 ///
96 /// This type is only available when the `std` or `alloc` feature of this
97 /// library is activated, and it is activated by default.
98 #[must_use = "streams do nothing unless polled"]
99 pub struct FuturesOrdered<T: Future> {
100     in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
101     queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
102     next_incoming_index: i64,
103     next_outgoing_index: i64,
104 }
105 
106 impl<T: Future> Unpin for FuturesOrdered<T> {}
107 
108 impl<Fut: Future> FuturesOrdered<Fut> {
109     /// Constructs a new, empty `FuturesOrdered`
110     ///
111     /// The returned [`FuturesOrdered`] does not contain any futures and, in
112     /// this state, [`FuturesOrdered::poll_next`] will return
113     /// [`Poll::Ready(None)`](Poll::Ready).
new() -> Self114     pub fn new() -> Self {
115         Self {
116             in_progress_queue: FuturesUnordered::new(),
117             queued_outputs: BinaryHeap::new(),
118             next_incoming_index: 0,
119             next_outgoing_index: 0,
120         }
121     }
122 
123     /// Returns the number of futures contained in the queue.
124     ///
125     /// This represents the total number of in-flight futures, both
126     /// those currently processing and those that have completed but
127     /// which are waiting for earlier futures to complete.
len(&self) -> usize128     pub fn len(&self) -> usize {
129         self.in_progress_queue.len() + self.queued_outputs.len()
130     }
131 
132     /// Returns `true` if the queue contains no futures
is_empty(&self) -> bool133     pub fn is_empty(&self) -> bool {
134         self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
135     }
136 
137     /// Push a future into the queue.
138     ///
139     /// This function submits the given future to the internal set for managing.
140     /// This function will not call [`poll`](Future::poll) on the submitted
141     /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
142     /// called in order to receive task notifications.
143     #[deprecated(note = "use `push_back` instead")]
push(&mut self, future: Fut)144     pub fn push(&mut self, future: Fut) {
145         self.push_back(future);
146     }
147 
148     /// Pushes a future to the back of the queue.
149     ///
150     /// This function submits the given future to the internal set for managing.
151     /// This function will not call [`poll`](Future::poll) on the submitted
152     /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
153     /// called in order to receive task notifications.
push_back(&mut self, future: Fut)154     pub fn push_back(&mut self, future: Fut) {
155         let wrapped = OrderWrapper { data: future, index: self.next_incoming_index };
156         self.next_incoming_index += 1;
157         self.in_progress_queue.push(wrapped);
158     }
159 
160     /// Pushes a future to the front of the queue.
161     ///
162     /// This function submits the given future to the internal set for managing.
163     /// This function will not call [`poll`](Future::poll) on the submitted
164     /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
165     /// called in order to receive task notifications. This future will be
166     /// the next future to be returned complete.
push_front(&mut self, future: Fut)167     pub fn push_front(&mut self, future: Fut) {
168         let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
169         self.next_outgoing_index -= 1;
170         self.in_progress_queue.push(wrapped);
171     }
172 }
173 
174 impl<Fut: Future> Default for FuturesOrdered<Fut> {
default() -> Self175     fn default() -> Self {
176         Self::new()
177     }
178 }
179 
180 impl<Fut: Future> Stream for FuturesOrdered<Fut> {
181     type Item = Fut::Output;
182 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>183     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
184         let this = &mut *self;
185 
186         // Check to see if we've already received the next value
187         if let Some(next_output) = this.queued_outputs.peek_mut() {
188             if next_output.index == this.next_outgoing_index {
189                 this.next_outgoing_index += 1;
190                 return Poll::Ready(Some(PeekMut::pop(next_output).data));
191             }
192         }
193 
194         loop {
195             match ready!(this.in_progress_queue.poll_next_unpin(cx)) {
196                 Some(output) => {
197                     if output.index == this.next_outgoing_index {
198                         this.next_outgoing_index += 1;
199                         return Poll::Ready(Some(output.data));
200                     } else {
201                         this.queued_outputs.push(output)
202                     }
203                 }
204                 None => return Poll::Ready(None),
205             }
206         }
207     }
208 
size_hint(&self) -> (usize, Option<usize>)209     fn size_hint(&self) -> (usize, Option<usize>) {
210         let len = self.len();
211         (len, Some(len))
212     }
213 }
214 
215 impl<Fut: Future> Debug for FuturesOrdered<Fut> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result216     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217         write!(f, "FuturesOrdered {{ ... }}")
218     }
219 }
220 
221 impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = Fut>,222     fn from_iter<T>(iter: T) -> Self
223     where
224         T: IntoIterator<Item = Fut>,
225     {
226         let acc = Self::new();
227         iter.into_iter().fold(acc, |mut acc, item| {
228             acc.push_back(item);
229             acc
230         })
231     }
232 }
233 
234 impl<Fut: Future> FusedStream for FuturesOrdered<Fut> {
is_terminated(&self) -> bool235     fn is_terminated(&self) -> bool {
236         self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
237     }
238 }
239 
240 impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
extend<I>(&mut self, iter: I) where I: IntoIterator<Item = Fut>,241     fn extend<I>(&mut self, iter: I)
242     where
243         I: IntoIterator<Item = Fut>,
244     {
245         for item in iter {
246             self.push_back(item);
247         }
248     }
249 }
250