1 use std::pin::Pin; 2 use std::task::{Context, Poll}; 3 4 use futures_core::Stream; 5 use pin_project_lite::pin_project; 6 7 use crate::stream_ext::Fuse; 8 use crate::StreamExt; 9 10 pin_project! { 11 /// Stream returned by the [`chain`](super::StreamExt::peekable) method. 12 pub struct Peekable<T: Stream> { 13 peek: Option<T::Item>, 14 #[pin] 15 stream: Fuse<T>, 16 } 17 } 18 19 impl<T: Stream> Peekable<T> { new(stream: T) -> Self20 pub(crate) fn new(stream: T) -> Self { 21 let stream = stream.fuse(); 22 Self { peek: None, stream } 23 } 24 25 /// Peek at the next item in the stream. peek(&mut self) -> Option<&T::Item> where T: Unpin,26 pub async fn peek(&mut self) -> Option<&T::Item> 27 where 28 T: Unpin, 29 { 30 if let Some(ref it) = self.peek { 31 Some(it) 32 } else { 33 self.peek = self.next().await; 34 self.peek.as_ref() 35 } 36 } 37 } 38 39 impl<T: Stream> Stream for Peekable<T> { 40 type Item = T::Item; 41 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>42 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 43 let this = self.project(); 44 if let Some(it) = this.peek.take() { 45 Poll::Ready(Some(it)) 46 } else { 47 this.stream.poll_next(cx) 48 } 49 } 50 } 51