1 use bytes::{Bytes, BytesMut}; 2 use futures_core::stream::Stream; 3 use pin_project_lite::pin_project; 4 use std::pin::Pin; 5 use std::task::{Context, Poll}; 6 use tokio::io::AsyncRead; 7 8 const DEFAULT_CAPACITY: usize = 4096; 9 10 pin_project! { 11 /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks. 12 /// 13 /// This stream is fused. It performs the inverse operation of 14 /// [`StreamReader`]. 15 /// 16 /// # Example 17 /// 18 /// ``` 19 /// # #[tokio::main] 20 /// # async fn main() -> std::io::Result<()> { 21 /// use tokio_stream::StreamExt; 22 /// use tokio_util::io::ReaderStream; 23 /// 24 /// // Create a stream of data. 25 /// let data = b"hello, world!"; 26 /// let mut stream = ReaderStream::new(&data[..]); 27 /// 28 /// // Read all of the chunks into a vector. 29 /// let mut stream_contents = Vec::new(); 30 /// while let Some(chunk) = stream.next().await { 31 /// stream_contents.extend_from_slice(&chunk?); 32 /// } 33 /// 34 /// // Once the chunks are concatenated, we should have the 35 /// // original data. 36 /// assert_eq!(stream_contents, data); 37 /// # Ok(()) 38 /// # } 39 /// ``` 40 /// 41 /// [`AsyncRead`]: tokio::io::AsyncRead 42 /// [`StreamReader`]: crate::io::StreamReader 43 /// [`Stream`]: futures_core::Stream 44 #[derive(Debug)] 45 pub struct ReaderStream<R> { 46 // Reader itself. 47 // 48 // This value is `None` if the stream has terminated. 49 #[pin] 50 reader: Option<R>, 51 // Working buffer, used to optimize allocations. 52 buf: BytesMut, 53 capacity: usize, 54 } 55 } 56 57 impl<R: AsyncRead> ReaderStream<R> { 58 /// Convert an [`AsyncRead`] into a [`Stream`] with item type 59 /// `Result<Bytes, std::io::Error>`. 60 /// 61 /// [`AsyncRead`]: tokio::io::AsyncRead 62 /// [`Stream`]: futures_core::Stream new(reader: R) -> Self63 pub fn new(reader: R) -> Self { 64 ReaderStream { 65 reader: Some(reader), 66 buf: BytesMut::new(), 67 capacity: DEFAULT_CAPACITY, 68 } 69 } 70 71 /// Convert an [`AsyncRead`] into a [`Stream`] with item type 72 /// `Result<Bytes, std::io::Error>`, 73 /// with a specific read buffer initial capacity. 74 /// 75 /// [`AsyncRead`]: tokio::io::AsyncRead 76 /// [`Stream`]: futures_core::Stream with_capacity(reader: R, capacity: usize) -> Self77 pub fn with_capacity(reader: R, capacity: usize) -> Self { 78 ReaderStream { 79 reader: Some(reader), 80 buf: BytesMut::with_capacity(capacity), 81 capacity, 82 } 83 } 84 } 85 86 impl<R: AsyncRead> Stream for ReaderStream<R> { 87 type Item = std::io::Result<Bytes>; poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>88 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 89 use crate::util::poll_read_buf; 90 91 let mut this = self.as_mut().project(); 92 93 let reader = match this.reader.as_pin_mut() { 94 Some(r) => r, 95 None => return Poll::Ready(None), 96 }; 97 98 if this.buf.capacity() == 0 { 99 this.buf.reserve(*this.capacity); 100 } 101 102 match poll_read_buf(reader, cx, &mut this.buf) { 103 Poll::Pending => Poll::Pending, 104 Poll::Ready(Err(err)) => { 105 self.project().reader.set(None); 106 Poll::Ready(Some(Err(err))) 107 } 108 Poll::Ready(Ok(0)) => { 109 self.project().reader.set(None); 110 Poll::Ready(None) 111 } 112 Poll::Ready(Ok(_)) => { 113 let chunk = this.buf.split(); 114 Poll::Ready(Some(Ok(chunk.freeze()))) 115 } 116 } 117 } 118 } 119