1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::TryFuture; 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream, TryStream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 11 pin_project! { 12 /// Stream for the [`and_then`](super::TryStreamExt::and_then) method. 13 #[must_use = "streams do nothing unless polled"] 14 pub struct AndThen<St, Fut, F> { 15 #[pin] 16 stream: St, 17 #[pin] 18 future: Option<Fut>, 19 f: F, 20 } 21 } 22 23 impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F> 24 where 25 St: fmt::Debug, 26 Fut: fmt::Debug, 27 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 29 f.debug_struct("AndThen") 30 .field("stream", &self.stream) 31 .field("future", &self.future) 32 .finish() 33 } 34 } 35 36 impl<St, Fut, F> AndThen<St, Fut, F> 37 where 38 St: TryStream, 39 F: FnMut(St::Ok) -> Fut, 40 Fut: TryFuture<Error = St::Error>, 41 { new(stream: St, f: F) -> Self42 pub(super) fn new(stream: St, f: F) -> Self { 43 Self { stream, future: None, f } 44 } 45 46 delegate_access_inner!(stream, St, ()); 47 } 48 49 impl<St, Fut, F> Stream for AndThen<St, Fut, F> 50 where 51 St: TryStream, 52 F: FnMut(St::Ok) -> Fut, 53 Fut: TryFuture<Error = St::Error>, 54 { 55 type Item = Result<Fut::Ok, St::Error>; 56 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>57 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 58 let mut this = self.project(); 59 60 Poll::Ready(loop { 61 if let Some(fut) = this.future.as_mut().as_pin_mut() { 62 let item = ready!(fut.try_poll(cx)); 63 this.future.set(None); 64 break Some(item); 65 } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { 66 this.future.set(Some((this.f)(item))); 67 } else { 68 break None; 69 } 70 }) 71 } 72 size_hint(&self) -> (usize, Option<usize>)73 fn size_hint(&self) -> (usize, Option<usize>) { 74 let future_len = usize::from(self.future.is_some()); 75 let (lower, upper) = self.stream.size_hint(); 76 let lower = lower.saturating_add(future_len); 77 let upper = match upper { 78 Some(x) => x.checked_add(future_len), 79 None => None, 80 }; 81 (lower, upper) 82 } 83 } 84 85 impl<St, Fut, F> FusedStream for AndThen<St, Fut, F> 86 where 87 St: TryStream + FusedStream, 88 F: FnMut(St::Ok) -> Fut, 89 Fut: TryFuture<Error = St::Error>, 90 { is_terminated(&self) -> bool91 fn is_terminated(&self) -> bool { 92 self.future.is_none() && self.stream.is_terminated() 93 } 94 } 95 96 // Forwarding impl of Sink from the underlying stream 97 #[cfg(feature = "sink")] 98 impl<S, Fut, F, Item> Sink<Item> for AndThen<S, Fut, F> 99 where 100 S: Sink<Item>, 101 { 102 type Error = S::Error; 103 104 delegate_sink!(stream, Item); 105 } 106