1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::Future; 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 11 pin_project! { 12 /// Stream for the [`then`](super::StreamExt::then) method. 13 #[must_use = "streams do nothing unless polled"] 14 pub struct Then<St, Fut, F> { 15 #[pin] 16 stream: St, 17 #[pin] 18 future: Option<Fut>, 19 f: F, 20 } 21 } 22 23 impl<St, Fut, F> fmt::Debug for Then<St, Fut, F> 24 where 25 St: fmt::Debug, 26 Fut: fmt::Debug, 27 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 29 f.debug_struct("Then").field("stream", &self.stream).field("future", &self.future).finish() 30 } 31 } 32 33 impl<St, Fut, F> Then<St, Fut, F> 34 where 35 St: Stream, 36 F: FnMut(St::Item) -> Fut, 37 { new(stream: St, f: F) -> Self38 pub(super) fn new(stream: St, f: F) -> Self { 39 Self { stream, future: None, f } 40 } 41 42 delegate_access_inner!(stream, St, ()); 43 } 44 45 impl<St, Fut, F> FusedStream for Then<St, Fut, F> 46 where 47 St: FusedStream, 48 F: FnMut(St::Item) -> Fut, 49 Fut: Future, 50 { is_terminated(&self) -> bool51 fn is_terminated(&self) -> bool { 52 self.future.is_none() && self.stream.is_terminated() 53 } 54 } 55 56 impl<St, Fut, F> Stream for Then<St, Fut, F> 57 where 58 St: Stream, 59 F: FnMut(St::Item) -> Fut, 60 Fut: Future, 61 { 62 type Item = Fut::Output; 63 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>64 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 65 let mut this = self.project(); 66 67 Poll::Ready(loop { 68 if let Some(fut) = this.future.as_mut().as_pin_mut() { 69 let item = ready!(fut.poll(cx)); 70 this.future.set(None); 71 break Some(item); 72 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { 73 this.future.set(Some((this.f)(item))); 74 } else { 75 break None; 76 } 77 }) 78 } 79 size_hint(&self) -> (usize, Option<usize>)80 fn size_hint(&self) -> (usize, Option<usize>) { 81 let future_len = usize::from(self.future.is_some()); 82 let (lower, upper) = self.stream.size_hint(); 83 let lower = lower.saturating_add(future_len); 84 let upper = match upper { 85 Some(x) => x.checked_add(future_len), 86 None => None, 87 }; 88 (lower, upper) 89 } 90 } 91 92 // Forwarding impl of Sink from the underlying stream 93 #[cfg(feature = "sink")] 94 impl<S, Fut, F, Item> Sink<Item> for Then<S, Fut, F> 95 where 96 S: Sink<Item>, 97 { 98 type Error = S::Error; 99 100 delegate_sink!(stream, Item); 101 } 102