1 //! Asynchronous I/O
2 //!
3 //! This crate contains the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and
4 //! `AsyncBufRead` traits, the asynchronous analogs to
5 //! `std::io::{Read, Write, Seek, BufRead}`. The primary difference is
6 //! that these traits integrate with the asynchronous task system.
7 //!
8 //! All items of this library are only available when the `std` feature of this
9 //! library is activated, and it is activated by default.
10 
11 #![no_std]
12 #![doc(test(
13     no_crate_inject,
14     attr(
15         deny(warnings, rust_2018_idioms, single_use_lifetimes),
16         allow(dead_code, unused_assignments, unused_variables)
17     )
18 ))]
19 #![warn(missing_docs, /* unsafe_op_in_unsafe_fn */)] // unsafe_op_in_unsafe_fn requires Rust 1.52
20 #![cfg_attr(docsrs, feature(doc_cfg))]
21 
22 #[cfg(feature = "std")]
23 extern crate std;
24 
25 #[cfg(feature = "std")]
26 mod if_std {
27     use std::boxed::Box;
28     use std::io;
29     use std::ops::DerefMut;
30     use std::pin::Pin;
31     use std::task::{Context, Poll};
32     use std::vec::Vec;
33 
34     // Re-export some types from `std::io` so that users don't have to deal
35     // with conflicts when `use`ing `futures::io` and `std::io`.
36     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
37     #[doc(no_inline)]
38     pub use io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
39 
40     /// Read bytes asynchronously.
41     ///
42     /// This trait is analogous to the `std::io::Read` trait, but integrates
43     /// with the asynchronous task system. In particular, the `poll_read`
44     /// method, unlike `Read::read`, will automatically queue the current task
45     /// for wakeup and return if data is not yet available, rather than blocking
46     /// the calling thread.
47     pub trait AsyncRead {
48         /// Attempt to read from the `AsyncRead` into `buf`.
49         ///
50         /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
51         ///
52         /// If no data is available for reading, the method returns
53         /// `Poll::Pending` and arranges for the current task (via
54         /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
55         /// readable or is closed.
56         ///
57         /// # Implementation
58         ///
59         /// This function may not return errors of kind `WouldBlock` or
60         /// `Interrupted`.  Implementations must convert `WouldBlock` into
61         /// `Poll::Pending` and either internally retry or convert
62         /// `Interrupted` into another error kind.
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>63         fn poll_read(
64             self: Pin<&mut Self>,
65             cx: &mut Context<'_>,
66             buf: &mut [u8],
67         ) -> Poll<Result<usize>>;
68 
69         /// Attempt to read from the `AsyncRead` into `bufs` using vectored
70         /// IO operations.
71         ///
72         /// This method is similar to `poll_read`, but allows data to be read
73         /// into multiple buffers using a single operation.
74         ///
75         /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
76         ///
77         /// If no data is available for reading, the method returns
78         /// `Poll::Pending` and arranges for the current task (via
79         /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
80         /// readable or is closed.
81         /// By default, this method delegates to using `poll_read` on the first
82         /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
83         /// support vectored IO should override this method.
84         ///
85         /// # Implementation
86         ///
87         /// This function may not return errors of kind `WouldBlock` or
88         /// `Interrupted`.  Implementations must convert `WouldBlock` into
89         /// `Poll::Pending` and either internally retry or convert
90         /// `Interrupted` into another error kind.
poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>>91         fn poll_read_vectored(
92             self: Pin<&mut Self>,
93             cx: &mut Context<'_>,
94             bufs: &mut [IoSliceMut<'_>],
95         ) -> Poll<Result<usize>> {
96             for b in bufs {
97                 if !b.is_empty() {
98                     return self.poll_read(cx, b);
99                 }
100             }
101 
102             self.poll_read(cx, &mut [])
103         }
104     }
105 
106     /// Write bytes asynchronously.
107     ///
108     /// This trait is analogous to the `std::io::Write` trait, but integrates
109     /// with the asynchronous task system. In particular, the `poll_write`
110     /// method, unlike `Write::write`, will automatically queue the current task
111     /// for wakeup and return if the writer cannot take more data, rather than blocking
112     /// the calling thread.
113     pub trait AsyncWrite {
114         /// Attempt to write bytes from `buf` into the object.
115         ///
116         /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
117         ///
118         /// If the object is not ready for writing, the method returns
119         /// `Poll::Pending` and arranges for the current task (via
120         /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
121         /// writable or is closed.
122         ///
123         /// # Implementation
124         ///
125         /// This function may not return errors of kind `WouldBlock` or
126         /// `Interrupted`.  Implementations must convert `WouldBlock` into
127         /// `Poll::Pending` and either internally retry or convert
128         /// `Interrupted` into another error kind.
129         ///
130         /// `poll_write` must try to make progress by flushing the underlying object if
131         /// that is the only way the underlying object can become writable again.
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>>132         fn poll_write(
133             self: Pin<&mut Self>,
134             cx: &mut Context<'_>,
135             buf: &[u8],
136         ) -> Poll<Result<usize>>;
137 
138         /// Attempt to write bytes from `bufs` into the object using vectored
139         /// IO operations.
140         ///
141         /// This method is similar to `poll_write`, but allows data from multiple buffers to be written
142         /// using a single operation.
143         ///
144         /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
145         ///
146         /// If the object is not ready for writing, the method returns
147         /// `Poll::Pending` and arranges for the current task (via
148         /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
149         /// writable or is closed.
150         ///
151         /// By default, this method delegates to using `poll_write` on the first
152         /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
153         /// support vectored IO should override this method.
154         ///
155         /// # Implementation
156         ///
157         /// This function may not return errors of kind `WouldBlock` or
158         /// `Interrupted`.  Implementations must convert `WouldBlock` into
159         /// `Poll::Pending` and either internally retry or convert
160         /// `Interrupted` into another error kind.
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>>161         fn poll_write_vectored(
162             self: Pin<&mut Self>,
163             cx: &mut Context<'_>,
164             bufs: &[IoSlice<'_>],
165         ) -> Poll<Result<usize>> {
166             for b in bufs {
167                 if !b.is_empty() {
168                     return self.poll_write(cx, b);
169                 }
170             }
171 
172             self.poll_write(cx, &[])
173         }
174 
175         /// Attempt to flush the object, ensuring that any buffered data reach
176         /// their destination.
177         ///
178         /// On success, returns `Poll::Ready(Ok(()))`.
179         ///
180         /// If flushing cannot immediately complete, this method returns
181         /// `Poll::Pending` and arranges for the current task (via
182         /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
183         /// progress towards flushing.
184         ///
185         /// # Implementation
186         ///
187         /// This function may not return errors of kind `WouldBlock` or
188         /// `Interrupted`.  Implementations must convert `WouldBlock` into
189         /// `Poll::Pending` and either internally retry or convert
190         /// `Interrupted` into another error kind.
191         ///
192         /// It only makes sense to do anything here if you actually buffer data.
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>193         fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
194 
195         /// Attempt to close the object.
196         ///
197         /// On success, returns `Poll::Ready(Ok(()))`.
198         ///
199         /// If closing cannot immediately complete, this function returns
200         /// `Poll::Pending` and arranges for the current task (via
201         /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
202         /// progress towards closing.
203         ///
204         /// # Implementation
205         ///
206         /// This function may not return errors of kind `WouldBlock` or
207         /// `Interrupted`.  Implementations must convert `WouldBlock` into
208         /// `Poll::Pending` and either internally retry or convert
209         /// `Interrupted` into another error kind.
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>210         fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
211     }
212 
213     /// Seek bytes asynchronously.
214     ///
215     /// This trait is analogous to the `std::io::Seek` trait, but integrates
216     /// with the asynchronous task system. In particular, the `poll_seek`
217     /// method, unlike `Seek::seek`, will automatically queue the current task
218     /// for wakeup and return if data is not yet available, rather than blocking
219     /// the calling thread.
220     pub trait AsyncSeek {
221         /// Attempt to seek to an offset, in bytes, in a stream.
222         ///
223         /// A seek beyond the end of a stream is allowed, but behavior is defined
224         /// by the implementation.
225         ///
226         /// If the seek operation completed successfully,
227         /// this method returns the new position from the start of the stream.
228         /// That position can be used later with [`SeekFrom::Start`].
229         ///
230         /// # Errors
231         ///
232         /// Seeking to a negative offset is considered an error.
233         ///
234         /// # Implementation
235         ///
236         /// This function may not return errors of kind `WouldBlock` or
237         /// `Interrupted`.  Implementations must convert `WouldBlock` into
238         /// `Poll::Pending` and either internally retry or convert
239         /// `Interrupted` into another error kind.
poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>>240         fn poll_seek(
241             self: Pin<&mut Self>,
242             cx: &mut Context<'_>,
243             pos: SeekFrom,
244         ) -> Poll<Result<u64>>;
245     }
246 
247     /// Read bytes asynchronously.
248     ///
249     /// This trait is analogous to the `std::io::BufRead` trait, but integrates
250     /// with the asynchronous task system. In particular, the `poll_fill_buf`
251     /// method, unlike `BufRead::fill_buf`, will automatically queue the current task
252     /// for wakeup and return if data is not yet available, rather than blocking
253     /// the calling thread.
254     pub trait AsyncBufRead: AsyncRead {
255         /// Attempt to return the contents of the internal buffer, filling it with more data
256         /// from the inner reader if it is empty.
257         ///
258         /// On success, returns `Poll::Ready(Ok(buf))`.
259         ///
260         /// If no data is available for reading, the method returns
261         /// `Poll::Pending` and arranges for the current task (via
262         /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
263         /// readable or is closed.
264         ///
265         /// This function is a lower-level call. It needs to be paired with the
266         /// [`consume`] method to function properly. When calling this
267         /// method, none of the contents will be "read" in the sense that later
268         /// calling [`poll_read`] may return the same contents. As such, [`consume`] must
269         /// be called with the number of bytes that are consumed from this buffer to
270         /// ensure that the bytes are never returned twice.
271         ///
272         /// [`poll_read`]: AsyncRead::poll_read
273         /// [`consume`]: AsyncBufRead::consume
274         ///
275         /// An empty buffer returned indicates that the stream has reached EOF.
276         ///
277         /// # Implementation
278         ///
279         /// This function may not return errors of kind `WouldBlock` or
280         /// `Interrupted`.  Implementations must convert `WouldBlock` into
281         /// `Poll::Pending` and either internally retry or convert
282         /// `Interrupted` into another error kind.
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>283         fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>;
284 
285         /// Tells this buffer that `amt` bytes have been consumed from the buffer,
286         /// so they should no longer be returned in calls to [`poll_read`].
287         ///
288         /// This function is a lower-level call. It needs to be paired with the
289         /// [`poll_fill_buf`] method to function properly. This function does
290         /// not perform any I/O, it simply informs this object that some amount of
291         /// its buffer, returned from [`poll_fill_buf`], has been consumed and should
292         /// no longer be returned. As such, this function may do odd things if
293         /// [`poll_fill_buf`] isn't called before calling it.
294         ///
295         /// The `amt` must be `<=` the number of bytes in the buffer returned by
296         /// [`poll_fill_buf`].
297         ///
298         /// [`poll_read`]: AsyncRead::poll_read
299         /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
consume(self: Pin<&mut Self>, amt: usize)300         fn consume(self: Pin<&mut Self>, amt: usize);
301     }
302 
303     macro_rules! deref_async_read {
304         () => {
305             fn poll_read(
306                 mut self: Pin<&mut Self>,
307                 cx: &mut Context<'_>,
308                 buf: &mut [u8],
309             ) -> Poll<Result<usize>> {
310                 Pin::new(&mut **self).poll_read(cx, buf)
311             }
312 
313             fn poll_read_vectored(
314                 mut self: Pin<&mut Self>,
315                 cx: &mut Context<'_>,
316                 bufs: &mut [IoSliceMut<'_>],
317             ) -> Poll<Result<usize>> {
318                 Pin::new(&mut **self).poll_read_vectored(cx, bufs)
319             }
320         };
321     }
322 
323     impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for Box<T> {
324         deref_async_read!();
325     }
326 
327     impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for &mut T {
328         deref_async_read!();
329     }
330 
331     impl<P> AsyncRead for Pin<P>
332     where
333         P: DerefMut + Unpin,
334         P::Target: AsyncRead,
335     {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>336         fn poll_read(
337             self: Pin<&mut Self>,
338             cx: &mut Context<'_>,
339             buf: &mut [u8],
340         ) -> Poll<Result<usize>> {
341             self.get_mut().as_mut().poll_read(cx, buf)
342         }
343 
poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>>344         fn poll_read_vectored(
345             self: Pin<&mut Self>,
346             cx: &mut Context<'_>,
347             bufs: &mut [IoSliceMut<'_>],
348         ) -> Poll<Result<usize>> {
349             self.get_mut().as_mut().poll_read_vectored(cx, bufs)
350         }
351     }
352 
353     macro_rules! delegate_async_read_to_stdio {
354         () => {
355             fn poll_read(
356                 mut self: Pin<&mut Self>,
357                 _: &mut Context<'_>,
358                 buf: &mut [u8],
359             ) -> Poll<Result<usize>> {
360                 Poll::Ready(io::Read::read(&mut *self, buf))
361             }
362 
363             fn poll_read_vectored(
364                 mut self: Pin<&mut Self>,
365                 _: &mut Context<'_>,
366                 bufs: &mut [IoSliceMut<'_>],
367             ) -> Poll<Result<usize>> {
368                 Poll::Ready(io::Read::read_vectored(&mut *self, bufs))
369             }
370         };
371     }
372 
373     impl AsyncRead for &[u8] {
374         delegate_async_read_to_stdio!();
375     }
376 
377     macro_rules! deref_async_write {
378         () => {
379             fn poll_write(
380                 mut self: Pin<&mut Self>,
381                 cx: &mut Context<'_>,
382                 buf: &[u8],
383             ) -> Poll<Result<usize>> {
384                 Pin::new(&mut **self).poll_write(cx, buf)
385             }
386 
387             fn poll_write_vectored(
388                 mut self: Pin<&mut Self>,
389                 cx: &mut Context<'_>,
390                 bufs: &[IoSlice<'_>],
391             ) -> Poll<Result<usize>> {
392                 Pin::new(&mut **self).poll_write_vectored(cx, bufs)
393             }
394 
395             fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
396                 Pin::new(&mut **self).poll_flush(cx)
397             }
398 
399             fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
400                 Pin::new(&mut **self).poll_close(cx)
401             }
402         };
403     }
404 
405     impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
406         deref_async_write!();
407     }
408 
409     impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
410         deref_async_write!();
411     }
412 
413     impl<P> AsyncWrite for Pin<P>
414     where
415         P: DerefMut + Unpin,
416         P::Target: AsyncWrite,
417     {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>>418         fn poll_write(
419             self: Pin<&mut Self>,
420             cx: &mut Context<'_>,
421             buf: &[u8],
422         ) -> Poll<Result<usize>> {
423             self.get_mut().as_mut().poll_write(cx, buf)
424         }
425 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>>426         fn poll_write_vectored(
427             self: Pin<&mut Self>,
428             cx: &mut Context<'_>,
429             bufs: &[IoSlice<'_>],
430         ) -> Poll<Result<usize>> {
431             self.get_mut().as_mut().poll_write_vectored(cx, bufs)
432         }
433 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>434         fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
435             self.get_mut().as_mut().poll_flush(cx)
436         }
437 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>438         fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
439             self.get_mut().as_mut().poll_close(cx)
440         }
441     }
442 
443     macro_rules! delegate_async_write_to_stdio {
444         () => {
445             fn poll_write(
446                 mut self: Pin<&mut Self>,
447                 _: &mut Context<'_>,
448                 buf: &[u8],
449             ) -> Poll<Result<usize>> {
450                 Poll::Ready(io::Write::write(&mut *self, buf))
451             }
452 
453             fn poll_write_vectored(
454                 mut self: Pin<&mut Self>,
455                 _: &mut Context<'_>,
456                 bufs: &[IoSlice<'_>],
457             ) -> Poll<Result<usize>> {
458                 Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
459             }
460 
461             fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
462                 Poll::Ready(io::Write::flush(&mut *self))
463             }
464 
465             fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
466                 self.poll_flush(cx)
467             }
468         };
469     }
470 
471     impl AsyncWrite for Vec<u8> {
472         delegate_async_write_to_stdio!();
473     }
474 
475     macro_rules! deref_async_seek {
476         () => {
477             fn poll_seek(
478                 mut self: Pin<&mut Self>,
479                 cx: &mut Context<'_>,
480                 pos: SeekFrom,
481             ) -> Poll<Result<u64>> {
482                 Pin::new(&mut **self).poll_seek(cx, pos)
483             }
484         };
485     }
486 
487     impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> {
488         deref_async_seek!();
489     }
490 
491     impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T {
492         deref_async_seek!();
493     }
494 
495     impl<P> AsyncSeek for Pin<P>
496     where
497         P: DerefMut + Unpin,
498         P::Target: AsyncSeek,
499     {
poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>>500         fn poll_seek(
501             self: Pin<&mut Self>,
502             cx: &mut Context<'_>,
503             pos: SeekFrom,
504         ) -> Poll<Result<u64>> {
505             self.get_mut().as_mut().poll_seek(cx, pos)
506         }
507     }
508 
509     macro_rules! deref_async_buf_read {
510         () => {
511             fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
512                 Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
513             }
514 
515             fn consume(mut self: Pin<&mut Self>, amt: usize) {
516                 Pin::new(&mut **self).consume(amt)
517             }
518         };
519     }
520 
521     impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> {
522         deref_async_buf_read!();
523     }
524 
525     impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T {
526         deref_async_buf_read!();
527     }
528 
529     impl<P> AsyncBufRead for Pin<P>
530     where
531         P: DerefMut + Unpin,
532         P::Target: AsyncBufRead,
533     {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>534         fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
535             self.get_mut().as_mut().poll_fill_buf(cx)
536         }
537 
consume(self: Pin<&mut Self>, amt: usize)538         fn consume(self: Pin<&mut Self>, amt: usize) {
539             self.get_mut().as_mut().consume(amt)
540         }
541     }
542 
543     macro_rules! delegate_async_buf_read_to_stdio {
544         () => {
545             fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
546                 Poll::Ready(io::BufRead::fill_buf(self.get_mut()))
547             }
548 
549             fn consume(self: Pin<&mut Self>, amt: usize) {
550                 io::BufRead::consume(self.get_mut(), amt)
551             }
552         };
553     }
554 
555     impl AsyncBufRead for &[u8] {
556         delegate_async_buf_read_to_stdio!();
557     }
558 }
559 
560 #[cfg(feature = "std")]
561 pub use self::if_std::*;
562