1 use super::DEFAULT_BUF_SIZE; 2 use futures_core::future::Future; 3 use futures_core::ready; 4 use futures_core::task::{Context, Poll}; 5 use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom}; 6 use pin_project_lite::pin_project; 7 use std::boxed::Box; 8 use std::io::{self, Read}; 9 use std::pin::Pin; 10 use std::vec; 11 use std::{cmp, fmt}; 12 13 pin_project! { 14 /// The `BufReader` struct adds buffering to any reader. 15 /// 16 /// It can be excessively inefficient to work directly with a [`AsyncRead`] 17 /// instance. A `BufReader` performs large, infrequent reads on the underlying 18 /// [`AsyncRead`] and maintains an in-memory buffer of the results. 19 /// 20 /// `BufReader` can improve the speed of programs that make *small* and 21 /// *repeated* read calls to the same file or network socket. It does not 22 /// help when reading very large amounts at once, or reading just one or a few 23 /// times. It also provides no advantage when reading from a source that is 24 /// already in memory, like a `Vec<u8>`. 25 /// 26 /// When the `BufReader` is dropped, the contents of its buffer will be 27 /// discarded. Creating multiple instances of a `BufReader` on the same 28 /// stream can cause data loss. 29 /// 30 /// [`AsyncRead`]: futures_io::AsyncRead 31 /// 32 // TODO: Examples 33 pub struct BufReader<R> { 34 #[pin] 35 inner: R, 36 buffer: Box<[u8]>, 37 pos: usize, 38 cap: usize, 39 } 40 } 41 42 impl<R: AsyncRead> BufReader<R> { 43 /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, 44 /// but may change in the future. new(inner: R) -> Self45 pub fn new(inner: R) -> Self { 46 Self::with_capacity(DEFAULT_BUF_SIZE, inner) 47 } 48 49 /// Creates a new `BufReader` with the specified buffer capacity. with_capacity(capacity: usize, inner: R) -> Self50 pub fn with_capacity(capacity: usize, inner: R) -> Self { 51 // TODO: consider using Box<[u8]>::new_uninit_slice once it stabilized 52 let buffer = vec![0; capacity]; 53 Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 } 54 } 55 } 56 57 impl<R> BufReader<R> { 58 delegate_access_inner!(inner, R, ()); 59 60 /// Returns a reference to the internally buffered data. 61 /// 62 /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. buffer(&self) -> &[u8]63 pub fn buffer(&self) -> &[u8] { 64 &self.buffer[self.pos..self.cap] 65 } 66 67 /// Invalidates all data in the internal buffer. 68 #[inline] discard_buffer(self: Pin<&mut Self>)69 fn discard_buffer(self: Pin<&mut Self>) { 70 let this = self.project(); 71 *this.pos = 0; 72 *this.cap = 0; 73 } 74 } 75 76 impl<R: AsyncRead + AsyncSeek> BufReader<R> { 77 /// Seeks relative to the current position. If the new position lies within the buffer, 78 /// the buffer will not be flushed, allowing for more efficient seeks. 79 /// This method does not return the location of the underlying reader, so the caller 80 /// must track this information themselves if it is required. seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R>81 pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> { 82 SeeKRelative { inner: self, offset, first: true } 83 } 84 85 /// Attempts to seek relative to the current position. If the new position lies within the buffer, 86 /// the buffer will not be flushed, allowing for more efficient seeks. 87 /// This method does not return the location of the underlying reader, so the caller 88 /// must track this information themselves if it is required. poll_seek_relative( self: Pin<&mut Self>, cx: &mut Context<'_>, offset: i64, ) -> Poll<io::Result<()>>89 pub fn poll_seek_relative( 90 self: Pin<&mut Self>, 91 cx: &mut Context<'_>, 92 offset: i64, 93 ) -> Poll<io::Result<()>> { 94 let pos = self.pos as u64; 95 if offset < 0 { 96 if let Some(new_pos) = pos.checked_sub((-offset) as u64) { 97 *self.project().pos = new_pos as usize; 98 return Poll::Ready(Ok(())); 99 } 100 } else if let Some(new_pos) = pos.checked_add(offset as u64) { 101 if new_pos <= self.cap as u64 { 102 *self.project().pos = new_pos as usize; 103 return Poll::Ready(Ok(())); 104 } 105 } 106 self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ())) 107 } 108 } 109 110 impl<R: AsyncRead> AsyncRead for BufReader<R> { poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>111 fn poll_read( 112 mut self: Pin<&mut Self>, 113 cx: &mut Context<'_>, 114 buf: &mut [u8], 115 ) -> Poll<io::Result<usize>> { 116 // If we don't have any buffered data and we're doing a massive read 117 // (larger than our internal buffer), bypass our internal buffer 118 // entirely. 119 if self.pos == self.cap && buf.len() >= self.buffer.len() { 120 let res = ready!(self.as_mut().project().inner.poll_read(cx, buf)); 121 self.discard_buffer(); 122 return Poll::Ready(res); 123 } 124 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; 125 let nread = rem.read(buf)?; 126 self.consume(nread); 127 Poll::Ready(Ok(nread)) 128 } 129 poll_read_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<io::Result<usize>>130 fn poll_read_vectored( 131 mut self: Pin<&mut Self>, 132 cx: &mut Context<'_>, 133 bufs: &mut [IoSliceMut<'_>], 134 ) -> Poll<io::Result<usize>> { 135 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>(); 136 if self.pos == self.cap && total_len >= self.buffer.len() { 137 let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs)); 138 self.discard_buffer(); 139 return Poll::Ready(res); 140 } 141 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; 142 let nread = rem.read_vectored(bufs)?; 143 self.consume(nread); 144 Poll::Ready(Ok(nread)) 145 } 146 } 147 148 impl<R: AsyncRead> AsyncBufRead for BufReader<R> { poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>149 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { 150 let this = self.project(); 151 152 // If we've reached the end of our internal buffer then we need to fetch 153 // some more data from the underlying reader. 154 // Branch using `>=` instead of the more correct `==` 155 // to tell the compiler that the pos..cap slice is always valid. 156 if *this.pos >= *this.cap { 157 debug_assert!(*this.pos == *this.cap); 158 *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?; 159 *this.pos = 0; 160 } 161 Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap])) 162 } 163 consume(self: Pin<&mut Self>, amt: usize)164 fn consume(self: Pin<&mut Self>, amt: usize) { 165 *self.project().pos = cmp::min(self.pos + amt, self.cap); 166 } 167 } 168 169 impl<R: AsyncWrite> AsyncWrite for BufReader<R> { 170 delegate_async_write!(inner); 171 } 172 173 impl<R: fmt::Debug> fmt::Debug for BufReader<R> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 175 f.debug_struct("BufReader") 176 .field("reader", &self.inner) 177 .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len())) 178 .finish() 179 } 180 } 181 182 impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> { 183 /// Seek to an offset, in bytes, in the underlying reader. 184 /// 185 /// The position used for seeking with `SeekFrom::Current(_)` is the 186 /// position the underlying reader would be at if the `BufReader` had no 187 /// internal buffer. 188 /// 189 /// Seeking always discards the internal buffer, even if the seek position 190 /// would otherwise fall within it. This guarantees that calling 191 /// `.into_inner()` immediately after a seek yields the underlying reader 192 /// at the same position. 193 /// 194 /// To seek without discarding the internal buffer, use 195 /// [`BufReader::seek_relative`](BufReader::seek_relative) or 196 /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative). 197 /// 198 /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details. 199 /// 200 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` 201 /// where `n` minus the internal buffer length overflows an `i64`, two 202 /// seeks will be performed instead of one. If the second seek returns 203 /// `Err`, the underlying reader will be left at the same position it would 204 /// have if you called `seek` with `SeekFrom::Current(0)`. poll_seek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<io::Result<u64>>205 fn poll_seek( 206 mut self: Pin<&mut Self>, 207 cx: &mut Context<'_>, 208 pos: SeekFrom, 209 ) -> Poll<io::Result<u64>> { 210 let result: u64; 211 if let SeekFrom::Current(n) = pos { 212 let remainder = (self.cap - self.pos) as i64; 213 // it should be safe to assume that remainder fits within an i64 as the alternative 214 // means we managed to allocate 8 exbibytes and that's absurd. 215 // But it's not out of the realm of possibility for some weird underlying reader to 216 // support seeking by i64::MIN so we need to handle underflow when subtracting 217 // remainder. 218 if let Some(offset) = n.checked_sub(remainder) { 219 result = 220 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?; 221 } else { 222 // seek backwards by our remainder, and then by the offset 223 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?; 224 self.as_mut().discard_buffer(); 225 result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?; 226 } 227 } else { 228 // Seeking with Start/End doesn't care about our buffer length. 229 result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?; 230 } 231 self.discard_buffer(); 232 Poll::Ready(Ok(result)) 233 } 234 } 235 236 /// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method. 237 #[derive(Debug)] 238 #[must_use = "futures do nothing unless polled"] 239 pub struct SeeKRelative<'a, R> { 240 inner: Pin<&'a mut BufReader<R>>, 241 offset: i64, 242 first: bool, 243 } 244 245 impl<R> Future for SeeKRelative<'_, R> 246 where 247 R: AsyncRead + AsyncSeek, 248 { 249 type Output = io::Result<()>; 250 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>251 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 252 let offset = self.offset; 253 if self.first { 254 self.first = false; 255 self.inner.as_mut().poll_seek_relative(cx, offset) 256 } else { 257 self.inner 258 .as_mut() 259 .as_mut() 260 .poll_seek(cx, SeekFrom::Current(offset)) 261 .map(|res| res.map(|_| ())) 262 } 263 } 264 } 265