1 use bytes::{Bytes, BytesMut};
2 use futures_core::stream::Stream;
3 use pin_project_lite::pin_project;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 use tokio::io::AsyncRead;
7 
8 const DEFAULT_CAPACITY: usize = 4096;
9 
10 pin_project! {
11     /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
12     ///
13     /// This stream is fused. It performs the inverse operation of
14     /// [`StreamReader`].
15     ///
16     /// # Example
17     ///
18     /// ```
19     /// # #[tokio::main]
20     /// # async fn main() -> std::io::Result<()> {
21     /// use tokio_stream::StreamExt;
22     /// use tokio_util::io::ReaderStream;
23     ///
24     /// // Create a stream of data.
25     /// let data = b"hello, world!";
26     /// let mut stream = ReaderStream::new(&data[..]);
27     ///
28     /// // Read all of the chunks into a vector.
29     /// let mut stream_contents = Vec::new();
30     /// while let Some(chunk) = stream.next().await {
31     ///    stream_contents.extend_from_slice(&chunk?);
32     /// }
33     ///
34     /// // Once the chunks are concatenated, we should have the
35     /// // original data.
36     /// assert_eq!(stream_contents, data);
37     /// # Ok(())
38     /// # }
39     /// ```
40     ///
41     /// [`AsyncRead`]: tokio::io::AsyncRead
42     /// [`StreamReader`]: crate::io::StreamReader
43     /// [`Stream`]: futures_core::Stream
44     #[derive(Debug)]
45     pub struct ReaderStream<R> {
46         // Reader itself.
47         //
48         // This value is `None` if the stream has terminated.
49         #[pin]
50         reader: Option<R>,
51         // Working buffer, used to optimize allocations.
52         buf: BytesMut,
53         capacity: usize,
54     }
55 }
56 
57 impl<R: AsyncRead> ReaderStream<R> {
58     /// Convert an [`AsyncRead`] into a [`Stream`] with item type
59     /// `Result<Bytes, std::io::Error>`.
60     ///
61     /// [`AsyncRead`]: tokio::io::AsyncRead
62     /// [`Stream`]: futures_core::Stream
new(reader: R) -> Self63     pub fn new(reader: R) -> Self {
64         ReaderStream {
65             reader: Some(reader),
66             buf: BytesMut::new(),
67             capacity: DEFAULT_CAPACITY,
68         }
69     }
70 
71     /// Convert an [`AsyncRead`] into a [`Stream`] with item type
72     /// `Result<Bytes, std::io::Error>`,
73     /// with a specific read buffer initial capacity.
74     ///
75     /// [`AsyncRead`]: tokio::io::AsyncRead
76     /// [`Stream`]: futures_core::Stream
with_capacity(reader: R, capacity: usize) -> Self77     pub fn with_capacity(reader: R, capacity: usize) -> Self {
78         ReaderStream {
79             reader: Some(reader),
80             buf: BytesMut::with_capacity(capacity),
81             capacity,
82         }
83     }
84 }
85 
86 impl<R: AsyncRead> Stream for ReaderStream<R> {
87     type Item = std::io::Result<Bytes>;
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>88     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
89         use crate::util::poll_read_buf;
90 
91         let mut this = self.as_mut().project();
92 
93         let reader = match this.reader.as_pin_mut() {
94             Some(r) => r,
95             None => return Poll::Ready(None),
96         };
97 
98         if this.buf.capacity() == 0 {
99             this.buf.reserve(*this.capacity);
100         }
101 
102         match poll_read_buf(reader, cx, &mut this.buf) {
103             Poll::Pending => Poll::Pending,
104             Poll::Ready(Err(err)) => {
105                 self.project().reader.set(None);
106                 Poll::Ready(Some(Err(err)))
107             }
108             Poll::Ready(Ok(0)) => {
109                 self.project().reader.set(None);
110                 Poll::Ready(None)
111             }
112             Poll::Ready(Ok(_)) => {
113                 let chunk = this.buf.split();
114                 Poll::Ready(Some(Ok(chunk.freeze())))
115             }
116         }
117     }
118 }
119