1 use super::DEFAULT_BUF_SIZE;
2 use futures_core::future::Future;
3 use futures_core::ready;
4 use futures_core::task::{Context, Poll};
5 use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
6 use pin_project_lite::pin_project;
7 use std::boxed::Box;
8 use std::io::{self, Read};
9 use std::pin::Pin;
10 use std::vec;
11 use std::{cmp, fmt};
12 
13 pin_project! {
14     /// The `BufReader` struct adds buffering to any reader.
15     ///
16     /// It can be excessively inefficient to work directly with a [`AsyncRead`]
17     /// instance. A `BufReader` performs large, infrequent reads on the underlying
18     /// [`AsyncRead`] and maintains an in-memory buffer of the results.
19     ///
20     /// `BufReader` can improve the speed of programs that make *small* and
21     /// *repeated* read calls to the same file or network socket. It does not
22     /// help when reading very large amounts at once, or reading just one or a few
23     /// times. It also provides no advantage when reading from a source that is
24     /// already in memory, like a `Vec<u8>`.
25     ///
26     /// When the `BufReader` is dropped, the contents of its buffer will be
27     /// discarded. Creating multiple instances of a `BufReader` on the same
28     /// stream can cause data loss.
29     ///
30     /// [`AsyncRead`]: futures_io::AsyncRead
31     ///
32     // TODO: Examples
33     pub struct BufReader<R> {
34         #[pin]
35         inner: R,
36         buffer: Box<[u8]>,
37         pos: usize,
38         cap: usize,
39     }
40 }
41 
42 impl<R: AsyncRead> BufReader<R> {
43     /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
44     /// but may change in the future.
new(inner: R) -> Self45     pub fn new(inner: R) -> Self {
46         Self::with_capacity(DEFAULT_BUF_SIZE, inner)
47     }
48 
49     /// Creates a new `BufReader` with the specified buffer capacity.
with_capacity(capacity: usize, inner: R) -> Self50     pub fn with_capacity(capacity: usize, inner: R) -> Self {
51         // TODO: consider using Box<[u8]>::new_uninit_slice once it stabilized
52         let buffer = vec![0; capacity];
53         Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 }
54     }
55 }
56 
57 impl<R> BufReader<R> {
58     delegate_access_inner!(inner, R, ());
59 
60     /// Returns a reference to the internally buffered data.
61     ///
62     /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
buffer(&self) -> &[u8]63     pub fn buffer(&self) -> &[u8] {
64         &self.buffer[self.pos..self.cap]
65     }
66 
67     /// Invalidates all data in the internal buffer.
68     #[inline]
discard_buffer(self: Pin<&mut Self>)69     fn discard_buffer(self: Pin<&mut Self>) {
70         let this = self.project();
71         *this.pos = 0;
72         *this.cap = 0;
73     }
74 }
75 
76 impl<R: AsyncRead + AsyncSeek> BufReader<R> {
77     /// Seeks relative to the current position. If the new position lies within the buffer,
78     /// the buffer will not be flushed, allowing for more efficient seeks.
79     /// This method does not return the location of the underlying reader, so the caller
80     /// must track this information themselves if it is required.
seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R>81     pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
82         SeeKRelative { inner: self, offset, first: true }
83     }
84 
85     /// Attempts to seek relative to the current position. If the new position lies within the buffer,
86     /// the buffer will not be flushed, allowing for more efficient seeks.
87     /// This method does not return the location of the underlying reader, so the caller
88     /// must track this information themselves if it is required.
poll_seek_relative( self: Pin<&mut Self>, cx: &mut Context<'_>, offset: i64, ) -> Poll<io::Result<()>>89     pub fn poll_seek_relative(
90         self: Pin<&mut Self>,
91         cx: &mut Context<'_>,
92         offset: i64,
93     ) -> Poll<io::Result<()>> {
94         let pos = self.pos as u64;
95         if offset < 0 {
96             if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
97                 *self.project().pos = new_pos as usize;
98                 return Poll::Ready(Ok(()));
99             }
100         } else if let Some(new_pos) = pos.checked_add(offset as u64) {
101             if new_pos <= self.cap as u64 {
102                 *self.project().pos = new_pos as usize;
103                 return Poll::Ready(Ok(()));
104             }
105         }
106         self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
107     }
108 }
109 
110 impl<R: AsyncRead> AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>111     fn poll_read(
112         mut self: Pin<&mut Self>,
113         cx: &mut Context<'_>,
114         buf: &mut [u8],
115     ) -> Poll<io::Result<usize>> {
116         // If we don't have any buffered data and we're doing a massive read
117         // (larger than our internal buffer), bypass our internal buffer
118         // entirely.
119         if self.pos == self.cap && buf.len() >= self.buffer.len() {
120             let res = ready!(self.as_mut().project().inner.poll_read(cx, buf));
121             self.discard_buffer();
122             return Poll::Ready(res);
123         }
124         let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
125         let nread = rem.read(buf)?;
126         self.consume(nread);
127         Poll::Ready(Ok(nread))
128     }
129 
poll_read_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<io::Result<usize>>130     fn poll_read_vectored(
131         mut self: Pin<&mut Self>,
132         cx: &mut Context<'_>,
133         bufs: &mut [IoSliceMut<'_>],
134     ) -> Poll<io::Result<usize>> {
135         let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
136         if self.pos == self.cap && total_len >= self.buffer.len() {
137             let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs));
138             self.discard_buffer();
139             return Poll::Ready(res);
140         }
141         let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
142         let nread = rem.read_vectored(bufs)?;
143         self.consume(nread);
144         Poll::Ready(Ok(nread))
145     }
146 }
147 
148 impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>149     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
150         let this = self.project();
151 
152         // If we've reached the end of our internal buffer then we need to fetch
153         // some more data from the underlying reader.
154         // Branch using `>=` instead of the more correct `==`
155         // to tell the compiler that the pos..cap slice is always valid.
156         if *this.pos >= *this.cap {
157             debug_assert!(*this.pos == *this.cap);
158             *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
159             *this.pos = 0;
160         }
161         Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
162     }
163 
consume(self: Pin<&mut Self>, amt: usize)164     fn consume(self: Pin<&mut Self>, amt: usize) {
165         *self.project().pos = cmp::min(self.pos + amt, self.cap);
166     }
167 }
168 
169 impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
170     delegate_async_write!(inner);
171 }
172 
173 impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result174     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175         f.debug_struct("BufReader")
176             .field("reader", &self.inner)
177             .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len()))
178             .finish()
179     }
180 }
181 
182 impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
183     /// Seek to an offset, in bytes, in the underlying reader.
184     ///
185     /// The position used for seeking with `SeekFrom::Current(_)` is the
186     /// position the underlying reader would be at if the `BufReader` had no
187     /// internal buffer.
188     ///
189     /// Seeking always discards the internal buffer, even if the seek position
190     /// would otherwise fall within it. This guarantees that calling
191     /// `.into_inner()` immediately after a seek yields the underlying reader
192     /// at the same position.
193     ///
194     /// To seek without discarding the internal buffer, use
195     /// [`BufReader::seek_relative`](BufReader::seek_relative) or
196     /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
197     ///
198     /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
199     ///
200     /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
201     /// where `n` minus the internal buffer length overflows an `i64`, two
202     /// seeks will be performed instead of one. If the second seek returns
203     /// `Err`, the underlying reader will be left at the same position it would
204     /// have if you called `seek` with `SeekFrom::Current(0)`.
poll_seek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<io::Result<u64>>205     fn poll_seek(
206         mut self: Pin<&mut Self>,
207         cx: &mut Context<'_>,
208         pos: SeekFrom,
209     ) -> Poll<io::Result<u64>> {
210         let result: u64;
211         if let SeekFrom::Current(n) = pos {
212             let remainder = (self.cap - self.pos) as i64;
213             // it should be safe to assume that remainder fits within an i64 as the alternative
214             // means we managed to allocate 8 exbibytes and that's absurd.
215             // But it's not out of the realm of possibility for some weird underlying reader to
216             // support seeking by i64::MIN so we need to handle underflow when subtracting
217             // remainder.
218             if let Some(offset) = n.checked_sub(remainder) {
219                 result =
220                     ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?;
221             } else {
222                 // seek backwards by our remainder, and then by the offset
223                 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?;
224                 self.as_mut().discard_buffer();
225                 result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?;
226             }
227         } else {
228             // Seeking with Start/End doesn't care about our buffer length.
229             result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
230         }
231         self.discard_buffer();
232         Poll::Ready(Ok(result))
233     }
234 }
235 
236 /// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
237 #[derive(Debug)]
238 #[must_use = "futures do nothing unless polled"]
239 pub struct SeeKRelative<'a, R> {
240     inner: Pin<&'a mut BufReader<R>>,
241     offset: i64,
242     first: bool,
243 }
244 
245 impl<R> Future for SeeKRelative<'_, R>
246 where
247     R: AsyncRead + AsyncSeek,
248 {
249     type Output = io::Result<()>;
250 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>251     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
252         let offset = self.offset;
253         if self.first {
254             self.first = false;
255             self.inner.as_mut().poll_seek_relative(cx, offset)
256         } else {
257             self.inner
258                 .as_mut()
259                 .as_mut()
260                 .poll_seek(cx, SeekFrom::Current(offset))
261                 .map(|res| res.map(|_| ()))
262         }
263     }
264 }
265