1 use crate::stream::{Fuse, IntoStream, StreamExt}; 2 3 use alloc::vec::Vec; 4 use core::pin::Pin; 5 use core::{fmt, mem}; 6 use futures_core::ready; 7 use futures_core::stream::{FusedStream, Stream, TryStream}; 8 use futures_core::task::{Context, Poll}; 9 #[cfg(feature = "sink")] 10 use futures_sink::Sink; 11 use pin_project_lite::pin_project; 12 13 pin_project! { 14 /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method. 15 #[derive(Debug)] 16 #[must_use = "streams do nothing unless polled"] 17 pub struct TryChunks<St: TryStream> { 18 #[pin] 19 stream: Fuse<IntoStream<St>>, 20 items: Vec<St::Ok>, 21 cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 22 } 23 } 24 25 impl<St: TryStream> TryChunks<St> { new(stream: St, capacity: usize) -> Self26 pub(super) fn new(stream: St, capacity: usize) -> Self { 27 assert!(capacity > 0); 28 29 Self { 30 stream: IntoStream::new(stream).fuse(), 31 items: Vec::with_capacity(capacity), 32 cap: capacity, 33 } 34 } 35 take(self: Pin<&mut Self>) -> Vec<St::Ok>36 fn take(self: Pin<&mut Self>) -> Vec<St::Ok> { 37 let cap = self.cap; 38 mem::replace(self.project().items, Vec::with_capacity(cap)) 39 } 40 41 delegate_access_inner!(stream, St, (. .)); 42 } 43 44 type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>; 45 46 impl<St: TryStream> Stream for TryChunks<St> { 47 type Item = Result<Vec<St::Ok>, TryChunksStreamError<St>>; 48 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>49 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 50 let mut this = self.as_mut().project(); 51 loop { 52 match ready!(this.stream.as_mut().try_poll_next(cx)) { 53 // Push the item into the buffer and check whether it is full. 54 // If so, replace our buffer with a new and empty one and return 55 // the full one. 56 Some(item) => match item { 57 Ok(item) => { 58 this.items.push(item); 59 if this.items.len() >= *this.cap { 60 return Poll::Ready(Some(Ok(self.take()))); 61 } 62 } 63 Err(e) => { 64 return Poll::Ready(Some(Err(TryChunksError(self.take(), e)))); 65 } 66 }, 67 68 // Since the underlying stream ran out of values, return what we 69 // have buffered, if we have anything. 70 None => { 71 let last = if this.items.is_empty() { 72 None 73 } else { 74 let full_buf = mem::take(this.items); 75 Some(full_buf) 76 }; 77 78 return Poll::Ready(last.map(Ok)); 79 } 80 } 81 } 82 } 83 size_hint(&self) -> (usize, Option<usize>)84 fn size_hint(&self) -> (usize, Option<usize>) { 85 let chunk_len = usize::from(!self.items.is_empty()); 86 let (lower, upper) = self.stream.size_hint(); 87 let lower = (lower / self.cap).saturating_add(chunk_len); 88 let upper = match upper { 89 Some(x) => x.checked_add(chunk_len), 90 None => None, 91 }; 92 (lower, upper) 93 } 94 } 95 96 impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> { is_terminated(&self) -> bool97 fn is_terminated(&self) -> bool { 98 self.stream.is_terminated() && self.items.is_empty() 99 } 100 } 101 102 // Forwarding impl of Sink from the underlying stream 103 #[cfg(feature = "sink")] 104 impl<S, Item> Sink<Item> for TryChunks<S> 105 where 106 S: TryStream + Sink<Item>, 107 { 108 type Error = <S as Sink<Item>>::Error; 109 110 delegate_sink!(stream, Item); 111 } 112 113 /// Error indicating, that while chunk was collected inner stream produced an error. 114 /// 115 /// Contains all items that were collected before an error occurred, and the stream error itself. 116 #[derive(PartialEq, Eq)] 117 pub struct TryChunksError<T, E>(pub Vec<T>, pub E); 118 119 impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 121 self.1.fmt(f) 122 } 123 } 124 125 impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 127 self.1.fmt(f) 128 } 129 } 130 131 #[cfg(feature = "std")] 132 impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {} 133