1 use core::pin::Pin;
2 use futures_core::ready;
3 use futures_core::stream::{FusedStream, Stream};
4 use futures_core::task::{Context, Poll};
5 use pin_project_lite::pin_project;
6 
7 pin_project! {
8     /// Stream for the [`cycle`](super::StreamExt::cycle) method.
9     #[derive(Debug)]
10     #[must_use = "streams do nothing unless polled"]
11     pub struct Cycle<St> {
12         orig: St,
13         #[pin]
14         stream: St,
15     }
16 }
17 
18 impl<St> Cycle<St>
19 where
20     St: Clone + Stream,
21 {
new(stream: St) -> Self22     pub(super) fn new(stream: St) -> Self {
23         Self { orig: stream.clone(), stream }
24     }
25 }
26 
27 impl<St> Stream for Cycle<St>
28 where
29     St: Clone + Stream,
30 {
31     type Item = St::Item;
32 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>33     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34         let mut this = self.project();
35 
36         match ready!(this.stream.as_mut().poll_next(cx)) {
37             None => {
38                 this.stream.set(this.orig.clone());
39                 this.stream.poll_next(cx)
40             }
41             item => Poll::Ready(item),
42         }
43     }
44 
size_hint(&self) -> (usize, Option<usize>)45     fn size_hint(&self) -> (usize, Option<usize>) {
46         // the cycle stream is either empty or infinite
47         match self.orig.size_hint() {
48             size @ (0, Some(0)) => size,
49             (0, _) => (0, None),
50             _ => (usize::MAX, None),
51         }
52     }
53 }
54 
55 impl<St> FusedStream for Cycle<St>
56 where
57     St: Clone + Stream,
58 {
is_terminated(&self) -> bool59     fn is_terminated(&self) -> bool {
60         // the cycle stream is either empty or infinite
61         if let (0, Some(0)) = self.size_hint() {
62             true
63         } else {
64             false
65         }
66     }
67 }
68