1 use crate::fns::FnMut1; 2 use core::fmt; 3 use core::pin::Pin; 4 use futures_core::future::Future; 5 use futures_core::ready; 6 use futures_core::stream::{FusedStream, Stream}; 7 use futures_core::task::{Context, Poll}; 8 #[cfg(feature = "sink")] 9 use futures_sink::Sink; 10 use pin_project_lite::pin_project; 11 12 pin_project! { 13 /// Stream for the [`filter`](super::StreamExt::filter) method. 14 #[must_use = "streams do nothing unless polled"] 15 pub struct Filter<St, Fut, F> 16 where St: Stream, 17 { 18 #[pin] 19 stream: St, 20 f: F, 21 #[pin] 22 pending_fut: Option<Fut>, 23 pending_item: Option<St::Item>, 24 } 25 } 26 27 impl<St, Fut, F> fmt::Debug for Filter<St, Fut, F> 28 where 29 St: Stream + fmt::Debug, 30 St::Item: fmt::Debug, 31 Fut: fmt::Debug, 32 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 34 f.debug_struct("Filter") 35 .field("stream", &self.stream) 36 .field("pending_fut", &self.pending_fut) 37 .field("pending_item", &self.pending_item) 38 .finish() 39 } 40 } 41 42 #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 43 impl<St, Fut, F> Filter<St, Fut, F> 44 where 45 St: Stream, 46 F: for<'a> FnMut1<&'a St::Item, Output = Fut>, 47 Fut: Future<Output = bool>, 48 { new(stream: St, f: F) -> Self49 pub(super) fn new(stream: St, f: F) -> Self { 50 Self { stream, f, pending_fut: None, pending_item: None } 51 } 52 53 delegate_access_inner!(stream, St, ()); 54 } 55 56 impl<St, Fut, F> FusedStream for Filter<St, Fut, F> 57 where 58 St: Stream + FusedStream, 59 F: FnMut(&St::Item) -> Fut, 60 Fut: Future<Output = bool>, 61 { is_terminated(&self) -> bool62 fn is_terminated(&self) -> bool { 63 self.pending_fut.is_none() && self.stream.is_terminated() 64 } 65 } 66 67 #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 68 impl<St, Fut, F> Stream for Filter<St, Fut, F> 69 where 70 St: Stream, 71 F: for<'a> FnMut1<&'a St::Item, Output = Fut>, 72 Fut: Future<Output = bool>, 73 { 74 type Item = St::Item; 75 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>>76 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { 77 let mut this = self.project(); 78 Poll::Ready(loop { 79 if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { 80 let res = ready!(fut.poll(cx)); 81 this.pending_fut.set(None); 82 if res { 83 break this.pending_item.take(); 84 } 85 *this.pending_item = None; 86 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { 87 this.pending_fut.set(Some(this.f.call_mut(&item))); 88 *this.pending_item = Some(item); 89 } else { 90 break None; 91 } 92 }) 93 } 94 size_hint(&self) -> (usize, Option<usize>)95 fn size_hint(&self) -> (usize, Option<usize>) { 96 let pending_len = usize::from(self.pending_item.is_some()); 97 let (_, upper) = self.stream.size_hint(); 98 let upper = match upper { 99 Some(x) => x.checked_add(pending_len), 100 None => None, 101 }; 102 (0, upper) // can't know a lower bound, due to the predicate 103 } 104 } 105 106 // Forwarding impl of Sink from the underlying stream 107 #[cfg(feature = "sink")] 108 impl<S, Fut, F, Item> Sink<Item> for Filter<S, Fut, F> 109 where 110 S: Stream + Sink<Item>, 111 F: FnMut(&S::Item) -> Fut, 112 Fut: Future<Output = bool>, 113 { 114 type Error = S::Error; 115 116 delegate_sink!(stream, Item); 117 } 118