1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::TryFuture; 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream, TryStream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 11 pin_project! { 12 /// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while) 13 /// method. 14 #[must_use = "streams do nothing unless polled"] 15 pub struct TryTakeWhile<St, Fut, F> 16 where 17 St: TryStream, 18 { 19 #[pin] 20 stream: St, 21 f: F, 22 #[pin] 23 pending_fut: Option<Fut>, 24 pending_item: Option<St::Ok>, 25 done_taking: bool, 26 } 27 } 28 29 impl<St, Fut, F> fmt::Debug for TryTakeWhile<St, Fut, F> 30 where 31 St: TryStream + fmt::Debug, 32 St::Ok: fmt::Debug, 33 Fut: fmt::Debug, 34 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 36 f.debug_struct("TryTakeWhile") 37 .field("stream", &self.stream) 38 .field("pending_fut", &self.pending_fut) 39 .field("pending_item", &self.pending_item) 40 .field("done_taking", &self.done_taking) 41 .finish() 42 } 43 } 44 45 impl<St, Fut, F> TryTakeWhile<St, Fut, F> 46 where 47 St: TryStream, 48 F: FnMut(&St::Ok) -> Fut, 49 Fut: TryFuture<Ok = bool, Error = St::Error>, 50 { new(stream: St, f: F) -> Self51 pub(super) fn new(stream: St, f: F) -> Self { 52 Self { stream, f, pending_fut: None, pending_item: None, done_taking: false } 53 } 54 55 delegate_access_inner!(stream, St, ()); 56 } 57 58 impl<St, Fut, F> Stream for TryTakeWhile<St, Fut, F> 59 where 60 St: TryStream, 61 F: FnMut(&St::Ok) -> Fut, 62 Fut: TryFuture<Ok = bool, Error = St::Error>, 63 { 64 type Item = Result<St::Ok, St::Error>; 65 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 67 let mut this = self.project(); 68 69 if *this.done_taking { 70 return Poll::Ready(None); 71 } 72 73 Poll::Ready(loop { 74 if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { 75 let res = ready!(fut.try_poll(cx)); 76 this.pending_fut.set(None); 77 let take = res?; 78 let item = this.pending_item.take(); 79 if take { 80 break item.map(Ok); 81 } else { 82 *this.done_taking = true; 83 break None; 84 } 85 } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { 86 this.pending_fut.set(Some((this.f)(&item))); 87 *this.pending_item = Some(item); 88 } else { 89 break None; 90 } 91 }) 92 } 93 size_hint(&self) -> (usize, Option<usize>)94 fn size_hint(&self) -> (usize, Option<usize>) { 95 if self.done_taking { 96 return (0, Some(0)); 97 } 98 99 let pending_len = usize::from(self.pending_item.is_some()); 100 let (_, upper) = self.stream.size_hint(); 101 let upper = match upper { 102 Some(x) => x.checked_add(pending_len), 103 None => None, 104 }; 105 (0, upper) // can't know a lower bound, due to the predicate 106 } 107 } 108 109 impl<St, Fut, F> FusedStream for TryTakeWhile<St, Fut, F> 110 where 111 St: TryStream + FusedStream, 112 F: FnMut(&St::Ok) -> Fut, 113 Fut: TryFuture<Ok = bool, Error = St::Error>, 114 { is_terminated(&self) -> bool115 fn is_terminated(&self) -> bool { 116 self.done_taking || self.pending_item.is_none() && self.stream.is_terminated() 117 } 118 } 119 120 // Forwarding impl of Sink from the underlying stream 121 #[cfg(feature = "sink")] 122 impl<S, Fut, F, Item, E> Sink<Item> for TryTakeWhile<S, Fut, F> 123 where 124 S: TryStream + Sink<Item, Error = E>, 125 { 126 type Error = E; 127 128 delegate_sink!(stream, Item); 129 } 130