1 use std::pin::Pin;
2 use std::task::{Context, Poll};
3 
4 use futures_core::Stream;
5 use pin_project_lite::pin_project;
6 
7 use crate::stream_ext::Fuse;
8 use crate::StreamExt;
9 
10 pin_project! {
11     /// Stream returned by the [`chain`](super::StreamExt::peekable) method.
12     pub struct Peekable<T: Stream> {
13         peek: Option<T::Item>,
14         #[pin]
15         stream: Fuse<T>,
16     }
17 }
18 
19 impl<T: Stream> Peekable<T> {
new(stream: T) -> Self20     pub(crate) fn new(stream: T) -> Self {
21         let stream = stream.fuse();
22         Self { peek: None, stream }
23     }
24 
25     /// Peek at the next item in the stream.
peek(&mut self) -> Option<&T::Item> where T: Unpin,26     pub async fn peek(&mut self) -> Option<&T::Item>
27     where
28         T: Unpin,
29     {
30         if let Some(ref it) = self.peek {
31             Some(it)
32         } else {
33             self.peek = self.next().await;
34             self.peek.as_ref()
35         }
36     }
37 }
38 
39 impl<T: Stream> Stream for Peekable<T> {
40     type Item = T::Item;
41 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>42     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43         let this = self.project();
44         if let Some(it) = this.peek.take() {
45             Poll::Ready(Some(it))
46         } else {
47             this.stream.poll_next(cx)
48         }
49     }
50 }
51