1 use core::pin::Pin; 2 use futures_core::ready; 3 use futures_core::stream::{FusedStream, Stream}; 4 use futures_core::task::{Context, Poll}; 5 use pin_project_lite::pin_project; 6 7 pin_project! { 8 /// Stream for the [`cycle`](super::StreamExt::cycle) method. 9 #[derive(Debug)] 10 #[must_use = "streams do nothing unless polled"] 11 pub struct Cycle<St> { 12 orig: St, 13 #[pin] 14 stream: St, 15 } 16 } 17 18 impl<St> Cycle<St> 19 where 20 St: Clone + Stream, 21 { new(stream: St) -> Self22 pub(super) fn new(stream: St) -> Self { 23 Self { orig: stream.clone(), stream } 24 } 25 } 26 27 impl<St> Stream for Cycle<St> 28 where 29 St: Clone + Stream, 30 { 31 type Item = St::Item; 32 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>33 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 34 let mut this = self.project(); 35 36 match ready!(this.stream.as_mut().poll_next(cx)) { 37 None => { 38 this.stream.set(this.orig.clone()); 39 this.stream.poll_next(cx) 40 } 41 item => Poll::Ready(item), 42 } 43 } 44 size_hint(&self) -> (usize, Option<usize>)45 fn size_hint(&self) -> (usize, Option<usize>) { 46 // the cycle stream is either empty or infinite 47 match self.orig.size_hint() { 48 size @ (0, Some(0)) => size, 49 (0, _) => (0, None), 50 _ => (usize::MAX, None), 51 } 52 } 53 } 54 55 impl<St> FusedStream for Cycle<St> 56 where 57 St: Clone + Stream, 58 { is_terminated(&self) -> bool59 fn is_terminated(&self) -> bool { 60 // the cycle stream is either empty or infinite 61 if let (0, Some(0)) = self.size_hint() { 62 true 63 } else { 64 false 65 } 66 } 67 } 68