1 use core::pin::Pin; 2 use futures_core::stream::{FusedStream, Stream, TryStream}; 3 use futures_core::task::{Context, Poll}; 4 #[cfg(feature = "sink")] 5 use futures_sink::Sink; 6 use pin_project_lite::pin_project; 7 8 pin_project! { 9 /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method. 10 #[derive(Debug)] 11 #[must_use = "streams do nothing unless polled"] 12 pub struct IntoStream<St> { 13 #[pin] 14 stream: St, 15 } 16 } 17 18 impl<St> IntoStream<St> { 19 #[inline] new(stream: St) -> Self20 pub(super) fn new(stream: St) -> Self { 21 Self { stream } 22 } 23 24 delegate_access_inner!(stream, St, ()); 25 } 26 27 impl<St: TryStream + FusedStream> FusedStream for IntoStream<St> { is_terminated(&self) -> bool28 fn is_terminated(&self) -> bool { 29 self.stream.is_terminated() 30 } 31 } 32 33 impl<St: TryStream> Stream for IntoStream<St> { 34 type Item = Result<St::Ok, St::Error>; 35 36 #[inline] poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>37 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 38 self.project().stream.try_poll_next(cx) 39 } 40 size_hint(&self) -> (usize, Option<usize>)41 fn size_hint(&self) -> (usize, Option<usize>) { 42 self.stream.size_hint() 43 } 44 } 45 46 // Forwarding impl of Sink from the underlying stream 47 #[cfg(feature = "sink")] 48 impl<S: Sink<Item>, Item> Sink<Item> for IntoStream<S> { 49 type Error = S::Error; 50 51 delegate_sink!(stream, Item); 52 } 53