1 use futures_core::ready;
2 use futures_core::task::{Context, Poll};
3 use futures_io::AsyncWrite;
4 use futures_sink::Sink;
5 use pin_project_lite::pin_project;
6 use std::io;
7 use std::pin::Pin;
8 
9 #[derive(Debug)]
10 struct Block<Item> {
11     offset: usize,
12     bytes: Item,
13 }
14 
15 pin_project! {
16     /// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
17     #[must_use = "sinks do nothing unless polled"]
18     #[derive(Debug)]
19     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
20     pub struct IntoSink<W, Item> {
21         #[pin]
22         writer: W,
23         // An outstanding block for us to push into the underlying writer, along with an offset of how
24         // far into this block we have written already.
25         buffer: Option<Block<Item>>,
26     }
27 }
28 
29 impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> {
new(writer: W) -> Self30     pub(super) fn new(writer: W) -> Self {
31         Self { writer, buffer: None }
32     }
33 
34     /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_
35     /// flush the writer after it succeeds in pushing the block into it.
poll_flush_buffer( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), io::Error>>36     fn poll_flush_buffer(
37         self: Pin<&mut Self>,
38         cx: &mut Context<'_>,
39     ) -> Poll<Result<(), io::Error>> {
40         let mut this = self.project();
41 
42         if let Some(buffer) = this.buffer {
43             loop {
44                 let bytes = buffer.bytes.as_ref();
45                 let written = ready!(this.writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?;
46                 buffer.offset += written;
47                 if buffer.offset == bytes.len() {
48                     break;
49                 }
50             }
51         }
52         *this.buffer = None;
53         Poll::Ready(Ok(()))
54     }
55 }
56 
57 impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> {
58     type Error = io::Error;
59 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>60     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61         ready!(self.poll_flush_buffer(cx))?;
62         Poll::Ready(Ok(()))
63     }
64 
start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>65     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
66         debug_assert!(self.buffer.is_none());
67         *self.project().buffer = Some(Block { offset: 0, bytes: item });
68         Ok(())
69     }
70 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>71     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72         ready!(self.as_mut().poll_flush_buffer(cx))?;
73         ready!(self.project().writer.poll_flush(cx))?;
74         Poll::Ready(Ok(()))
75     }
76 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>77     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78         ready!(self.as_mut().poll_flush_buffer(cx))?;
79         ready!(self.project().writer.poll_close(cx))?;
80         Poll::Ready(Ok(()))
81     }
82 }
83