1 use core::pin::Pin; 2 use futures_core::future::Future; 3 use futures_core::ready; 4 use futures_core::stream::TryStream; 5 use futures_core::task::{Context, Poll}; 6 use pin_project_lite::pin_project; 7 8 pin_project! { 9 /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method. 10 #[derive(Debug)] 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct TryConcat<St: TryStream> { 13 #[pin] 14 stream: St, 15 accum: Option<St::Ok>, 16 } 17 } 18 19 impl<St> TryConcat<St> 20 where 21 St: TryStream, 22 St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default, 23 { new(stream: St) -> Self24 pub(super) fn new(stream: St) -> Self { 25 Self { stream, accum: None } 26 } 27 } 28 29 impl<St> Future for TryConcat<St> 30 where 31 St: TryStream, 32 St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default, 33 { 34 type Output = Result<St::Ok, St::Error>; 35 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>36 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 37 let mut this = self.project(); 38 39 Poll::Ready(Ok(loop { 40 if let Some(x) = ready!(this.stream.as_mut().try_poll_next(cx)?) { 41 if let Some(a) = this.accum { 42 a.extend(x) 43 } else { 44 *this.accum = Some(x) 45 } 46 } else { 47 break this.accum.take().unwrap_or_default(); 48 } 49 })) 50 } 51 } 52