1 use crate::io::{AsyncBufRead, AsyncWrite}; 2 use std::future::Future; 3 use std::io; 4 use std::pin::Pin; 5 use std::task::{ready, Context, Poll}; 6 7 cfg_io_util! { 8 /// A future that asynchronously copies the entire contents of a reader into a 9 /// writer. 10 /// 11 /// This struct is generally created by calling [`copy_buf`][copy_buf]. Please 12 /// see the documentation of `copy_buf()` for more details. 13 /// 14 /// [copy_buf]: copy_buf() 15 #[derive(Debug)] 16 #[must_use = "futures do nothing unless you `.await` or poll them"] 17 struct CopyBuf<'a, R: ?Sized, W: ?Sized> { 18 reader: &'a mut R, 19 writer: &'a mut W, 20 amt: u64, 21 } 22 23 /// Asynchronously copies the entire contents of a reader into a writer. 24 /// 25 /// This function returns a future that will continuously read data from 26 /// `reader` and then write it into `writer` in a streaming fashion until 27 /// `reader` returns EOF or fails. 28 /// 29 /// On success, the total number of bytes that were copied from `reader` to 30 /// `writer` is returned. 31 /// 32 /// This is a [`tokio::io::copy`] alternative for [`AsyncBufRead`] readers 33 /// with no extra buffer allocation, since [`AsyncBufRead`] allow access 34 /// to the reader's inner buffer. 35 /// 36 /// [`tokio::io::copy`]: crate::io::copy 37 /// [`AsyncBufRead`]: crate::io::AsyncBufRead 38 /// 39 /// # Errors 40 /// 41 /// The returned future will finish with an error will return an error 42 /// immediately if any call to `poll_fill_buf` or `poll_write` returns an 43 /// error. 44 /// 45 /// # Examples 46 /// 47 /// ``` 48 /// use tokio::io; 49 /// 50 /// # async fn dox() -> std::io::Result<()> { 51 /// let mut reader: &[u8] = b"hello"; 52 /// let mut writer: Vec<u8> = vec![]; 53 /// 54 /// io::copy_buf(&mut reader, &mut writer).await?; 55 /// 56 /// assert_eq!(b"hello", &writer[..]); 57 /// # Ok(()) 58 /// # } 59 /// ``` 60 pub async fn copy_buf<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result<u64> 61 where 62 R: AsyncBufRead + Unpin + ?Sized, 63 W: AsyncWrite + Unpin + ?Sized, 64 { 65 CopyBuf { 66 reader, 67 writer, 68 amt: 0, 69 }.await 70 } 71 } 72 73 impl<R, W> Future for CopyBuf<'_, R, W> 74 where 75 R: AsyncBufRead + Unpin + ?Sized, 76 W: AsyncWrite + Unpin + ?Sized, 77 { 78 type Output = io::Result<u64>; 79 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>80 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 81 loop { 82 let me = &mut *self; 83 let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?; 84 if buffer.is_empty() { 85 ready!(Pin::new(&mut self.writer).poll_flush(cx))?; 86 return Poll::Ready(Ok(self.amt)); 87 } 88 89 let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?; 90 if i == 0 { 91 return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); 92 } 93 self.amt += i as u64; 94 Pin::new(&mut *self.reader).consume(i); 95 } 96 } 97 } 98 99 #[cfg(test)] 100 mod tests { 101 use super::*; 102 103 #[test] assert_unpin()104 fn assert_unpin() { 105 use std::marker::PhantomPinned; 106 crate::is_unpin::<CopyBuf<'_, PhantomPinned, PhantomPinned>>(); 107 } 108 } 109