1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::future::TryFuture;
4 use futures_core::ready;
5 use futures_core::stream::{FusedStream, Stream, TryStream};
6 use futures_core::task::{Context, Poll};
7 #[cfg(feature = "sink")]
8 use futures_sink::Sink;
9 use pin_project_lite::pin_project;
10 
11 pin_project! {
12     /// Stream for the [`and_then`](super::TryStreamExt::and_then) method.
13     #[must_use = "streams do nothing unless polled"]
14     pub struct AndThen<St, Fut, F> {
15         #[pin]
16         stream: St,
17         #[pin]
18         future: Option<Fut>,
19         f: F,
20     }
21 }
22 
23 impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F>
24 where
25     St: fmt::Debug,
26     Fut: fmt::Debug,
27 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29         f.debug_struct("AndThen")
30             .field("stream", &self.stream)
31             .field("future", &self.future)
32             .finish()
33     }
34 }
35 
36 impl<St, Fut, F> AndThen<St, Fut, F>
37 where
38     St: TryStream,
39     F: FnMut(St::Ok) -> Fut,
40     Fut: TryFuture<Error = St::Error>,
41 {
new(stream: St, f: F) -> Self42     pub(super) fn new(stream: St, f: F) -> Self {
43         Self { stream, future: None, f }
44     }
45 
46     delegate_access_inner!(stream, St, ());
47 }
48 
49 impl<St, Fut, F> Stream for AndThen<St, Fut, F>
50 where
51     St: TryStream,
52     F: FnMut(St::Ok) -> Fut,
53     Fut: TryFuture<Error = St::Error>,
54 {
55     type Item = Result<Fut::Ok, St::Error>;
56 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>57     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58         let mut this = self.project();
59 
60         Poll::Ready(loop {
61             if let Some(fut) = this.future.as_mut().as_pin_mut() {
62                 let item = ready!(fut.try_poll(cx));
63                 this.future.set(None);
64                 break Some(item);
65             } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
66                 this.future.set(Some((this.f)(item)));
67             } else {
68                 break None;
69             }
70         })
71     }
72 
size_hint(&self) -> (usize, Option<usize>)73     fn size_hint(&self) -> (usize, Option<usize>) {
74         let future_len = usize::from(self.future.is_some());
75         let (lower, upper) = self.stream.size_hint();
76         let lower = lower.saturating_add(future_len);
77         let upper = match upper {
78             Some(x) => x.checked_add(future_len),
79             None => None,
80         };
81         (lower, upper)
82     }
83 }
84 
85 impl<St, Fut, F> FusedStream for AndThen<St, Fut, F>
86 where
87     St: TryStream + FusedStream,
88     F: FnMut(St::Ok) -> Fut,
89     Fut: TryFuture<Error = St::Error>,
90 {
is_terminated(&self) -> bool91     fn is_terminated(&self) -> bool {
92         self.future.is_none() && self.stream.is_terminated()
93     }
94 }
95 
96 // Forwarding impl of Sink from the underlying stream
97 #[cfg(feature = "sink")]
98 impl<S, Fut, F, Item> Sink<Item> for AndThen<S, Fut, F>
99 where
100     S: Sink<Item>,
101 {
102     type Error = S::Error;
103 
104     delegate_sink!(stream, Item);
105 }
106