1 use crate::stream::{Fuse, IntoStream, StreamExt};
2 
3 use alloc::vec::Vec;
4 use core::pin::Pin;
5 use core::{fmt, mem};
6 use futures_core::ready;
7 use futures_core::stream::{FusedStream, Stream, TryStream};
8 use futures_core::task::{Context, Poll};
9 #[cfg(feature = "sink")]
10 use futures_sink::Sink;
11 use pin_project_lite::pin_project;
12 
13 pin_project! {
14     /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method.
15     #[derive(Debug)]
16     #[must_use = "streams do nothing unless polled"]
17     pub struct TryChunks<St: TryStream> {
18         #[pin]
19         stream: Fuse<IntoStream<St>>,
20         items: Vec<St::Ok>,
21         cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
22     }
23 }
24 
25 impl<St: TryStream> TryChunks<St> {
new(stream: St, capacity: usize) -> Self26     pub(super) fn new(stream: St, capacity: usize) -> Self {
27         assert!(capacity > 0);
28 
29         Self {
30             stream: IntoStream::new(stream).fuse(),
31             items: Vec::with_capacity(capacity),
32             cap: capacity,
33         }
34     }
35 
take(self: Pin<&mut Self>) -> Vec<St::Ok>36     fn take(self: Pin<&mut Self>) -> Vec<St::Ok> {
37         let cap = self.cap;
38         mem::replace(self.project().items, Vec::with_capacity(cap))
39     }
40 
41     delegate_access_inner!(stream, St, (. .));
42 }
43 
44 type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>;
45 
46 impl<St: TryStream> Stream for TryChunks<St> {
47     type Item = Result<Vec<St::Ok>, TryChunksStreamError<St>>;
48 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>49     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50         let mut this = self.as_mut().project();
51         loop {
52             match ready!(this.stream.as_mut().try_poll_next(cx)) {
53                 // Push the item into the buffer and check whether it is full.
54                 // If so, replace our buffer with a new and empty one and return
55                 // the full one.
56                 Some(item) => match item {
57                     Ok(item) => {
58                         this.items.push(item);
59                         if this.items.len() >= *this.cap {
60                             return Poll::Ready(Some(Ok(self.take())));
61                         }
62                     }
63                     Err(e) => {
64                         return Poll::Ready(Some(Err(TryChunksError(self.take(), e))));
65                     }
66                 },
67 
68                 // Since the underlying stream ran out of values, return what we
69                 // have buffered, if we have anything.
70                 None => {
71                     let last = if this.items.is_empty() {
72                         None
73                     } else {
74                         let full_buf = mem::take(this.items);
75                         Some(full_buf)
76                     };
77 
78                     return Poll::Ready(last.map(Ok));
79                 }
80             }
81         }
82     }
83 
size_hint(&self) -> (usize, Option<usize>)84     fn size_hint(&self) -> (usize, Option<usize>) {
85         let chunk_len = usize::from(!self.items.is_empty());
86         let (lower, upper) = self.stream.size_hint();
87         let lower = (lower / self.cap).saturating_add(chunk_len);
88         let upper = match upper {
89             Some(x) => x.checked_add(chunk_len),
90             None => None,
91         };
92         (lower, upper)
93     }
94 }
95 
96 impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> {
is_terminated(&self) -> bool97     fn is_terminated(&self) -> bool {
98         self.stream.is_terminated() && self.items.is_empty()
99     }
100 }
101 
102 // Forwarding impl of Sink from the underlying stream
103 #[cfg(feature = "sink")]
104 impl<S, Item> Sink<Item> for TryChunks<S>
105 where
106     S: TryStream + Sink<Item>,
107 {
108     type Error = <S as Sink<Item>>::Error;
109 
110     delegate_sink!(stream, Item);
111 }
112 
113 /// Error indicating, that while chunk was collected inner stream produced an error.
114 ///
115 /// Contains all items that were collected before an error occurred, and the stream error itself.
116 #[derive(PartialEq, Eq)]
117 pub struct TryChunksError<T, E>(pub Vec<T>, pub E);
118 
119 impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result120     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121         self.1.fmt(f)
122     }
123 }
124 
125 impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result126     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127         self.1.fmt(f)
128     }
129 }
130 
131 #[cfg(feature = "std")]
132 impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {}
133