1 use std::io::{self, IoSlice};
2 use std::ops::DerefMut;
3 use std::pin::Pin;
4 use std::task::{Context, Poll};
5 
6 /// Writes bytes asynchronously.
7 ///
8 /// The trait inherits from [`std::io::Write`] and indicates that an I/O object is
9 /// **nonblocking**. All non-blocking I/O objects must return an error when
10 /// bytes cannot be written instead of blocking the current thread.
11 ///
12 /// Specifically, this means that the [`poll_write`] function will return one of
13 /// the following:
14 ///
15 /// * `Poll::Ready(Ok(n))` means that `n` bytes of data was immediately
16 ///   written.
17 ///
18 /// * `Poll::Pending` means that no data was written from the buffer
19 ///   provided. The I/O object is not currently writable but may become writable
20 ///   in the future. Most importantly, **the current future's task is scheduled
21 ///   to get unparked when the object is writable**. This means that like
22 ///   `Future::poll` you'll receive a notification when the I/O object is
23 ///   writable again.
24 ///
25 /// * `Poll::Ready(Err(e))` for other errors are standard I/O errors coming from the
26 ///   underlying object.
27 ///
28 /// This trait importantly means that the [`write`][stdwrite] method only works in
29 /// the context of a future's task. The object may panic if used outside of a task.
30 ///
31 /// Note that this trait also represents that the  [`Write::flush`][stdflush] method
32 /// works very similarly to the `write` method, notably that `Ok(())` means that the
33 /// writer has successfully been flushed, a "would block" error means that the
34 /// current task is ready to receive a notification when flushing can make more
35 /// progress, and otherwise normal errors can happen as well.
36 ///
37 /// Utilities for working with `AsyncWrite` values are provided by
38 /// [`AsyncWriteExt`].
39 ///
40 /// [`std::io::Write`]: std::io::Write
41 /// [`poll_write`]: AsyncWrite::poll_write()
42 /// [stdwrite]: std::io::Write::write()
43 /// [stdflush]: std::io::Write::flush()
44 /// [`AsyncWriteExt`]: crate::io::AsyncWriteExt
45 pub trait AsyncWrite {
46     /// Attempt to write bytes from `buf` into the object.
47     ///
48     /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. If successful,
49     /// then it must be guaranteed that `n <= buf.len()`. A return value of `0`
50     /// typically means that the underlying object is no longer able to accept
51     /// bytes and will likely not be able to in the future as well, or that the
52     /// buffer provided is empty.
53     ///
54     /// If the object is not ready for writing, the method returns
55     /// `Poll::Pending` and arranges for the current task (via
56     /// `cx.waker()`) to receive a notification when the object becomes
57     /// writable or is closed.
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, io::Error>>58     fn poll_write(
59         self: Pin<&mut Self>,
60         cx: &mut Context<'_>,
61         buf: &[u8],
62     ) -> Poll<Result<usize, io::Error>>;
63 
64     /// Attempts to flush the object, ensuring that any buffered data reach
65     /// their destination.
66     ///
67     /// On success, returns `Poll::Ready(Ok(()))`.
68     ///
69     /// If flushing cannot immediately complete, this method returns
70     /// `Poll::Pending` and arranges for the current task (via
71     /// `cx.waker()`) to receive a notification when the object can make
72     /// progress towards flushing.
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>73     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
74 
75     /// Initiates or attempts to shut down this writer, returning success when
76     /// the I/O connection has completely shut down.
77     ///
78     /// This method is intended to be used for asynchronous shutdown of I/O
79     /// connections. For example this is suitable for implementing shutdown of a
80     /// TLS connection or calling `TcpStream::shutdown` on a proxied connection.
81     /// Protocols sometimes need to flush out final pieces of data or otherwise
82     /// perform a graceful shutdown handshake, reading/writing more data as
83     /// appropriate. This method is the hook for such protocols to implement the
84     /// graceful shutdown logic.
85     ///
86     /// This `shutdown` method is required by implementers of the
87     /// `AsyncWrite` trait. Wrappers typically just want to proxy this call
88     /// through to the wrapped type, and base types will typically implement
89     /// shutdown logic here or just return `Ok(().into())`. Note that if you're
90     /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that
91     /// transitively the entire stream has been shut down. After your wrapper's
92     /// shutdown logic has been executed you should shut down the underlying
93     /// stream.
94     ///
95     /// Invocation of a `shutdown` implies an invocation of `flush`. Once this
96     /// method returns `Ready` it implies that a flush successfully happened
97     /// before the shutdown happened. That is, callers don't need to call
98     /// `flush` before calling `shutdown`. They can rely that by calling
99     /// `shutdown` any pending buffered data will be written out.
100     ///
101     /// # Return value
102     ///
103     /// This function returns a `Poll<io::Result<()>>` classified as such:
104     ///
105     /// * `Poll::Ready(Ok(()))` - indicates that the connection was
106     ///   successfully shut down and is now safe to deallocate/drop/close
107     ///   resources associated with it. This method means that the current task
108     ///   will no longer receive any notifications due to this method and the
109     ///   I/O object itself is likely no longer usable.
110     ///
111     /// * `Poll::Pending` - indicates that shutdown is initiated but could
112     ///   not complete just yet. This may mean that more I/O needs to happen to
113     ///   continue this shutdown operation. The current task is scheduled to
114     ///   receive a notification when it's otherwise ready to continue the
115     ///   shutdown operation. When woken up this method should be called again.
116     ///
117     /// * `Poll::Ready(Err(e))` - indicates a fatal error has happened with shutdown,
118     ///   indicating that the shutdown operation did not complete successfully.
119     ///   This typically means that the I/O object is no longer usable.
120     ///
121     /// # Errors
122     ///
123     /// This function can return normal I/O errors through `Err`, described
124     /// above. Additionally this method may also render the underlying
125     /// `Write::write` method no longer usable (e.g. will return errors in the
126     /// future). It's recommended that once `shutdown` is called the
127     /// `write` method is no longer called.
128     ///
129     /// # Panics
130     ///
131     /// This function will panic if not called within the context of a future's
132     /// task.
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>133     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
134 
135     /// Like [`poll_write`], except that it writes from a slice of buffers.
136     ///
137     /// Data is copied from each buffer in order, with the final buffer
138     /// read from possibly being only partially consumed. This method must
139     /// behave as a call to [`write`] with the buffers concatenated would.
140     ///
141     /// The default implementation calls [`poll_write`] with either the first nonempty
142     /// buffer provided, or an empty one if none exists.
143     ///
144     /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
145     ///
146     /// If the object is not ready for writing, the method returns
147     /// `Poll::Pending` and arranges for the current task (via
148     /// `cx.waker()`) to receive a notification when the object becomes
149     /// writable or is closed.
150     ///
151     /// # Note
152     ///
153     /// This should be implemented as a single "atomic" write action. If any
154     /// data has been partially written, it is wrong to return an error or
155     /// pending.
156     ///
157     /// [`poll_write`]: AsyncWrite::poll_write
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize, io::Error>>158     fn poll_write_vectored(
159         self: Pin<&mut Self>,
160         cx: &mut Context<'_>,
161         bufs: &[IoSlice<'_>],
162     ) -> Poll<Result<usize, io::Error>> {
163         let buf = bufs
164             .iter()
165             .find(|b| !b.is_empty())
166             .map_or(&[][..], |b| &**b);
167         self.poll_write(cx, buf)
168     }
169 
170     /// Determines if this writer has an efficient [`poll_write_vectored`]
171     /// implementation.
172     ///
173     /// If a writer does not override the default [`poll_write_vectored`]
174     /// implementation, code using it may want to avoid the method all together
175     /// and coalesce writes into a single buffer for higher performance.
176     ///
177     /// The default implementation returns `false`.
178     ///
179     /// [`poll_write_vectored`]: AsyncWrite::poll_write_vectored
is_write_vectored(&self) -> bool180     fn is_write_vectored(&self) -> bool {
181         false
182     }
183 }
184 
185 macro_rules! deref_async_write {
186     () => {
187         fn poll_write(
188             mut self: Pin<&mut Self>,
189             cx: &mut Context<'_>,
190             buf: &[u8],
191         ) -> Poll<io::Result<usize>> {
192             Pin::new(&mut **self).poll_write(cx, buf)
193         }
194 
195         fn poll_write_vectored(
196             mut self: Pin<&mut Self>,
197             cx: &mut Context<'_>,
198             bufs: &[IoSlice<'_>],
199         ) -> Poll<io::Result<usize>> {
200             Pin::new(&mut **self).poll_write_vectored(cx, bufs)
201         }
202 
203         fn is_write_vectored(&self) -> bool {
204             (**self).is_write_vectored()
205         }
206 
207         fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
208             Pin::new(&mut **self).poll_flush(cx)
209         }
210 
211         fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
212             Pin::new(&mut **self).poll_shutdown(cx)
213         }
214     };
215 }
216 
217 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
218     deref_async_write!();
219 }
220 
221 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
222     deref_async_write!();
223 }
224 
225 impl<P> AsyncWrite for Pin<P>
226 where
227     P: DerefMut + Unpin,
228     P::Target: AsyncWrite,
229 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>230     fn poll_write(
231         self: Pin<&mut Self>,
232         cx: &mut Context<'_>,
233         buf: &[u8],
234     ) -> Poll<io::Result<usize>> {
235         self.get_mut().as_mut().poll_write(cx, buf)
236     }
237 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>238     fn poll_write_vectored(
239         self: Pin<&mut Self>,
240         cx: &mut Context<'_>,
241         bufs: &[IoSlice<'_>],
242     ) -> Poll<io::Result<usize>> {
243         self.get_mut().as_mut().poll_write_vectored(cx, bufs)
244     }
245 
is_write_vectored(&self) -> bool246     fn is_write_vectored(&self) -> bool {
247         (**self).is_write_vectored()
248     }
249 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>250     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
251         self.get_mut().as_mut().poll_flush(cx)
252     }
253 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>254     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
255         self.get_mut().as_mut().poll_shutdown(cx)
256     }
257 }
258 
259 impl AsyncWrite for Vec<u8> {
poll_write( self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>260     fn poll_write(
261         self: Pin<&mut Self>,
262         _cx: &mut Context<'_>,
263         buf: &[u8],
264     ) -> Poll<io::Result<usize>> {
265         self.get_mut().extend_from_slice(buf);
266         Poll::Ready(Ok(buf.len()))
267     }
268 
poll_write_vectored( mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>269     fn poll_write_vectored(
270         mut self: Pin<&mut Self>,
271         _: &mut Context<'_>,
272         bufs: &[IoSlice<'_>],
273     ) -> Poll<io::Result<usize>> {
274         Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
275     }
276 
is_write_vectored(&self) -> bool277     fn is_write_vectored(&self) -> bool {
278         true
279     }
280 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>281     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
282         Poll::Ready(Ok(()))
283     }
284 
poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>285     fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
286         Poll::Ready(Ok(()))
287     }
288 }
289 
290 impl AsyncWrite for io::Cursor<&mut [u8]> {
poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>291     fn poll_write(
292         mut self: Pin<&mut Self>,
293         _: &mut Context<'_>,
294         buf: &[u8],
295     ) -> Poll<io::Result<usize>> {
296         Poll::Ready(io::Write::write(&mut *self, buf))
297     }
298 
poll_write_vectored( mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>299     fn poll_write_vectored(
300         mut self: Pin<&mut Self>,
301         _: &mut Context<'_>,
302         bufs: &[IoSlice<'_>],
303     ) -> Poll<io::Result<usize>> {
304         Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
305     }
306 
is_write_vectored(&self) -> bool307     fn is_write_vectored(&self) -> bool {
308         true
309     }
310 
poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>311     fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
312         Poll::Ready(io::Write::flush(&mut *self))
313     }
314 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>315     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
316         self.poll_flush(cx)
317     }
318 }
319 
320 impl AsyncWrite for io::Cursor<&mut Vec<u8>> {
poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>321     fn poll_write(
322         mut self: Pin<&mut Self>,
323         _: &mut Context<'_>,
324         buf: &[u8],
325     ) -> Poll<io::Result<usize>> {
326         Poll::Ready(io::Write::write(&mut *self, buf))
327     }
328 
poll_write_vectored( mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>329     fn poll_write_vectored(
330         mut self: Pin<&mut Self>,
331         _: &mut Context<'_>,
332         bufs: &[IoSlice<'_>],
333     ) -> Poll<io::Result<usize>> {
334         Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
335     }
336 
is_write_vectored(&self) -> bool337     fn is_write_vectored(&self) -> bool {
338         true
339     }
340 
poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>341     fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
342         Poll::Ready(io::Write::flush(&mut *self))
343     }
344 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>345     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
346         self.poll_flush(cx)
347     }
348 }
349 
350 impl AsyncWrite for io::Cursor<Vec<u8>> {
poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>351     fn poll_write(
352         mut self: Pin<&mut Self>,
353         _: &mut Context<'_>,
354         buf: &[u8],
355     ) -> Poll<io::Result<usize>> {
356         Poll::Ready(io::Write::write(&mut *self, buf))
357     }
358 
poll_write_vectored( mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>359     fn poll_write_vectored(
360         mut self: Pin<&mut Self>,
361         _: &mut Context<'_>,
362         bufs: &[IoSlice<'_>],
363     ) -> Poll<io::Result<usize>> {
364         Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
365     }
366 
is_write_vectored(&self) -> bool367     fn is_write_vectored(&self) -> bool {
368         true
369     }
370 
poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>371     fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
372         Poll::Ready(io::Write::flush(&mut *self))
373     }
374 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>375     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
376         self.poll_flush(cx)
377     }
378 }
379 
380 impl AsyncWrite for io::Cursor<Box<[u8]>> {
poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>381     fn poll_write(
382         mut self: Pin<&mut Self>,
383         _: &mut Context<'_>,
384         buf: &[u8],
385     ) -> Poll<io::Result<usize>> {
386         Poll::Ready(io::Write::write(&mut *self, buf))
387     }
388 
poll_write_vectored( mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>389     fn poll_write_vectored(
390         mut self: Pin<&mut Self>,
391         _: &mut Context<'_>,
392         bufs: &[IoSlice<'_>],
393     ) -> Poll<io::Result<usize>> {
394         Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
395     }
396 
is_write_vectored(&self) -> bool397     fn is_write_vectored(&self) -> bool {
398         true
399     }
400 
poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>401     fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
402         Poll::Ready(io::Write::flush(&mut *self))
403     }
404 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>405     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
406         self.poll_flush(cx)
407     }
408 }
409