1 use core::pin::Pin; 2 use futures_core::ready; 3 use futures_core::stream::{FusedStream, Stream}; 4 use futures_core::task::{Context, Poll}; 5 #[cfg(feature = "sink")] 6 use futures_sink::Sink; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Stream for the [`fuse`](super::StreamExt::fuse) method. 11 #[derive(Debug)] 12 #[must_use = "streams do nothing unless polled"] 13 pub struct Fuse<St> { 14 #[pin] 15 stream: St, 16 done: bool, 17 } 18 } 19 20 impl<St> Fuse<St> { new(stream: St) -> Self21 pub(super) fn new(stream: St) -> Self { 22 Self { stream, done: false } 23 } 24 25 /// Returns whether the underlying stream has finished or not. 26 /// 27 /// If this method returns `true`, then all future calls to poll are 28 /// guaranteed to return `None`. If this returns `false`, then the 29 /// underlying stream is still in use. is_done(&self) -> bool30 pub fn is_done(&self) -> bool { 31 self.done 32 } 33 34 delegate_access_inner!(stream, St, ()); 35 } 36 37 impl<S: Stream> FusedStream for Fuse<S> { is_terminated(&self) -> bool38 fn is_terminated(&self) -> bool { 39 self.done 40 } 41 } 42 43 impl<S: Stream> Stream for Fuse<S> { 44 type Item = S::Item; 45 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>46 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { 47 let this = self.project(); 48 49 if *this.done { 50 return Poll::Ready(None); 51 } 52 53 let item = ready!(this.stream.poll_next(cx)); 54 if item.is_none() { 55 *this.done = true; 56 } 57 Poll::Ready(item) 58 } 59 size_hint(&self) -> (usize, Option<usize>)60 fn size_hint(&self) -> (usize, Option<usize>) { 61 if self.done { 62 (0, Some(0)) 63 } else { 64 self.stream.size_hint() 65 } 66 } 67 } 68 69 // Forwarding impl of Sink from the underlying stream 70 #[cfg(feature = "sink")] 71 impl<S: Stream + Sink<Item>, Item> Sink<Item> for Fuse<S> { 72 type Error = S::Error; 73 74 delegate_sink!(stream, Item); 75 } 76