1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::future::Future;
4 use futures_core::ready;
5 use futures_core::stream::{FusedStream, Stream};
6 use futures_core::task::{Context, Poll};
7 #[cfg(feature = "sink")]
8 use futures_sink::Sink;
9 use pin_project_lite::pin_project;
10 
11 pin_project! {
12     /// Stream for the [`then`](super::StreamExt::then) method.
13     #[must_use = "streams do nothing unless polled"]
14     pub struct Then<St, Fut, F> {
15         #[pin]
16         stream: St,
17         #[pin]
18         future: Option<Fut>,
19         f: F,
20     }
21 }
22 
23 impl<St, Fut, F> fmt::Debug for Then<St, Fut, F>
24 where
25     St: fmt::Debug,
26     Fut: fmt::Debug,
27 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29         f.debug_struct("Then").field("stream", &self.stream).field("future", &self.future).finish()
30     }
31 }
32 
33 impl<St, Fut, F> Then<St, Fut, F>
34 where
35     St: Stream,
36     F: FnMut(St::Item) -> Fut,
37 {
new(stream: St, f: F) -> Self38     pub(super) fn new(stream: St, f: F) -> Self {
39         Self { stream, future: None, f }
40     }
41 
42     delegate_access_inner!(stream, St, ());
43 }
44 
45 impl<St, Fut, F> FusedStream for Then<St, Fut, F>
46 where
47     St: FusedStream,
48     F: FnMut(St::Item) -> Fut,
49     Fut: Future,
50 {
is_terminated(&self) -> bool51     fn is_terminated(&self) -> bool {
52         self.future.is_none() && self.stream.is_terminated()
53     }
54 }
55 
56 impl<St, Fut, F> Stream for Then<St, Fut, F>
57 where
58     St: Stream,
59     F: FnMut(St::Item) -> Fut,
60     Fut: Future,
61 {
62     type Item = Fut::Output;
63 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>64     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65         let mut this = self.project();
66 
67         Poll::Ready(loop {
68             if let Some(fut) = this.future.as_mut().as_pin_mut() {
69                 let item = ready!(fut.poll(cx));
70                 this.future.set(None);
71                 break Some(item);
72             } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
73                 this.future.set(Some((this.f)(item)));
74             } else {
75                 break None;
76             }
77         })
78     }
79 
size_hint(&self) -> (usize, Option<usize>)80     fn size_hint(&self) -> (usize, Option<usize>) {
81         let future_len = usize::from(self.future.is_some());
82         let (lower, upper) = self.stream.size_hint();
83         let lower = lower.saturating_add(future_len);
84         let upper = match upper {
85             Some(x) => x.checked_add(future_len),
86             None => None,
87         };
88         (lower, upper)
89     }
90 }
91 
92 // Forwarding impl of Sink from the underlying stream
93 #[cfg(feature = "sink")]
94 impl<S, Fut, F, Item> Sink<Item> for Then<S, Fut, F>
95 where
96     S: Sink<Item>,
97 {
98     type Error = S::Error;
99 
100     delegate_sink!(stream, Item);
101 }
102