1 use core::pin::Pin;
2 use futures_core::future::Future;
3 use futures_core::ready;
4 use futures_core::stream::TryStream;
5 use futures_core::task::{Context, Poll};
6 use pin_project_lite::pin_project;
7 
8 pin_project! {
9     /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method.
10     #[derive(Debug)]
11     #[must_use = "futures do nothing unless you `.await` or poll them"]
12     pub struct TryConcat<St: TryStream> {
13         #[pin]
14         stream: St,
15         accum: Option<St::Ok>,
16     }
17 }
18 
19 impl<St> TryConcat<St>
20 where
21     St: TryStream,
22     St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
23 {
new(stream: St) -> Self24     pub(super) fn new(stream: St) -> Self {
25         Self { stream, accum: None }
26     }
27 }
28 
29 impl<St> Future for TryConcat<St>
30 where
31     St: TryStream,
32     St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
33 {
34     type Output = Result<St::Ok, St::Error>;
35 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>36     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37         let mut this = self.project();
38 
39         Poll::Ready(Ok(loop {
40             if let Some(x) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
41                 if let Some(a) = this.accum {
42                     a.extend(x)
43                 } else {
44                     *this.accum = Some(x)
45                 }
46             } else {
47                 break this.accum.take().unwrap_or_default();
48             }
49         }))
50     }
51 }
52