1 use futures_core::ready; 2 use futures_core::task::{Context, Poll}; 3 use futures_io::AsyncWrite; 4 use futures_sink::Sink; 5 use pin_project_lite::pin_project; 6 use std::io; 7 use std::pin::Pin; 8 9 #[derive(Debug)] 10 struct Block<Item> { 11 offset: usize, 12 bytes: Item, 13 } 14 15 pin_project! { 16 /// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method. 17 #[must_use = "sinks do nothing unless polled"] 18 #[derive(Debug)] 19 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 20 pub struct IntoSink<W, Item> { 21 #[pin] 22 writer: W, 23 // An outstanding block for us to push into the underlying writer, along with an offset of how 24 // far into this block we have written already. 25 buffer: Option<Block<Item>>, 26 } 27 } 28 29 impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> { new(writer: W) -> Self30 pub(super) fn new(writer: W) -> Self { 31 Self { writer, buffer: None } 32 } 33 34 /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_ 35 /// flush the writer after it succeeds in pushing the block into it. poll_flush_buffer( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), io::Error>>36 fn poll_flush_buffer( 37 self: Pin<&mut Self>, 38 cx: &mut Context<'_>, 39 ) -> Poll<Result<(), io::Error>> { 40 let mut this = self.project(); 41 42 if let Some(buffer) = this.buffer { 43 loop { 44 let bytes = buffer.bytes.as_ref(); 45 let written = ready!(this.writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?; 46 buffer.offset += written; 47 if buffer.offset == bytes.len() { 48 break; 49 } 50 } 51 } 52 *this.buffer = None; 53 Poll::Ready(Ok(())) 54 } 55 } 56 57 impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> { 58 type Error = io::Error; 59 poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>60 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 61 ready!(self.poll_flush_buffer(cx))?; 62 Poll::Ready(Ok(())) 63 } 64 start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>65 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { 66 debug_assert!(self.buffer.is_none()); 67 *self.project().buffer = Some(Block { offset: 0, bytes: item }); 68 Ok(()) 69 } 70 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>71 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 72 ready!(self.as_mut().poll_flush_buffer(cx))?; 73 ready!(self.project().writer.poll_flush(cx))?; 74 Poll::Ready(Ok(())) 75 } 76 poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>77 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 78 ready!(self.as_mut().poll_flush_buffer(cx))?; 79 ready!(self.project().writer.poll_close(cx))?; 80 Poll::Ready(Ok(())) 81 } 82 } 83