1 //! Asynchronous I/O 2 //! 3 //! This crate contains the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and 4 //! `AsyncBufRead` traits, the asynchronous analogs to 5 //! `std::io::{Read, Write, Seek, BufRead}`. The primary difference is 6 //! that these traits integrate with the asynchronous task system. 7 //! 8 //! All items of this library are only available when the `std` feature of this 9 //! library is activated, and it is activated by default. 10 11 #![no_std] 12 #![doc(test( 13 no_crate_inject, 14 attr( 15 deny(warnings, rust_2018_idioms, single_use_lifetimes), 16 allow(dead_code, unused_assignments, unused_variables) 17 ) 18 ))] 19 #![warn(missing_docs, /* unsafe_op_in_unsafe_fn */)] // unsafe_op_in_unsafe_fn requires Rust 1.52 20 #![cfg_attr(docsrs, feature(doc_cfg))] 21 22 #[cfg(feature = "std")] 23 extern crate std; 24 25 #[cfg(feature = "std")] 26 mod if_std { 27 use std::boxed::Box; 28 use std::io; 29 use std::ops::DerefMut; 30 use std::pin::Pin; 31 use std::task::{Context, Poll}; 32 use std::vec::Vec; 33 34 // Re-export some types from `std::io` so that users don't have to deal 35 // with conflicts when `use`ing `futures::io` and `std::io`. 36 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 37 #[doc(no_inline)] 38 pub use io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; 39 40 /// Read bytes asynchronously. 41 /// 42 /// This trait is analogous to the `std::io::Read` trait, but integrates 43 /// with the asynchronous task system. In particular, the `poll_read` 44 /// method, unlike `Read::read`, will automatically queue the current task 45 /// for wakeup and return if data is not yet available, rather than blocking 46 /// the calling thread. 47 pub trait AsyncRead { 48 /// Attempt to read from the `AsyncRead` into `buf`. 49 /// 50 /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. 51 /// 52 /// If no data is available for reading, the method returns 53 /// `Poll::Pending` and arranges for the current task (via 54 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes 55 /// readable or is closed. 56 /// 57 /// # Implementation 58 /// 59 /// This function may not return errors of kind `WouldBlock` or 60 /// `Interrupted`. Implementations must convert `WouldBlock` into 61 /// `Poll::Pending` and either internally retry or convert 62 /// `Interrupted` into another error kind. poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>63 fn poll_read( 64 self: Pin<&mut Self>, 65 cx: &mut Context<'_>, 66 buf: &mut [u8], 67 ) -> Poll<Result<usize>>; 68 69 /// Attempt to read from the `AsyncRead` into `bufs` using vectored 70 /// IO operations. 71 /// 72 /// This method is similar to `poll_read`, but allows data to be read 73 /// into multiple buffers using a single operation. 74 /// 75 /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. 76 /// 77 /// If no data is available for reading, the method returns 78 /// `Poll::Pending` and arranges for the current task (via 79 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes 80 /// readable or is closed. 81 /// By default, this method delegates to using `poll_read` on the first 82 /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which 83 /// support vectored IO should override this method. 84 /// 85 /// # Implementation 86 /// 87 /// This function may not return errors of kind `WouldBlock` or 88 /// `Interrupted`. Implementations must convert `WouldBlock` into 89 /// `Poll::Pending` and either internally retry or convert 90 /// `Interrupted` into another error kind. poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>>91 fn poll_read_vectored( 92 self: Pin<&mut Self>, 93 cx: &mut Context<'_>, 94 bufs: &mut [IoSliceMut<'_>], 95 ) -> Poll<Result<usize>> { 96 for b in bufs { 97 if !b.is_empty() { 98 return self.poll_read(cx, b); 99 } 100 } 101 102 self.poll_read(cx, &mut []) 103 } 104 } 105 106 /// Write bytes asynchronously. 107 /// 108 /// This trait is analogous to the `std::io::Write` trait, but integrates 109 /// with the asynchronous task system. In particular, the `poll_write` 110 /// method, unlike `Write::write`, will automatically queue the current task 111 /// for wakeup and return if the writer cannot take more data, rather than blocking 112 /// the calling thread. 113 pub trait AsyncWrite { 114 /// Attempt to write bytes from `buf` into the object. 115 /// 116 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. 117 /// 118 /// If the object is not ready for writing, the method returns 119 /// `Poll::Pending` and arranges for the current task (via 120 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes 121 /// writable or is closed. 122 /// 123 /// # Implementation 124 /// 125 /// This function may not return errors of kind `WouldBlock` or 126 /// `Interrupted`. Implementations must convert `WouldBlock` into 127 /// `Poll::Pending` and either internally retry or convert 128 /// `Interrupted` into another error kind. 129 /// 130 /// `poll_write` must try to make progress by flushing the underlying object if 131 /// that is the only way the underlying object can become writable again. poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>>132 fn poll_write( 133 self: Pin<&mut Self>, 134 cx: &mut Context<'_>, 135 buf: &[u8], 136 ) -> Poll<Result<usize>>; 137 138 /// Attempt to write bytes from `bufs` into the object using vectored 139 /// IO operations. 140 /// 141 /// This method is similar to `poll_write`, but allows data from multiple buffers to be written 142 /// using a single operation. 143 /// 144 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. 145 /// 146 /// If the object is not ready for writing, the method returns 147 /// `Poll::Pending` and arranges for the current task (via 148 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes 149 /// writable or is closed. 150 /// 151 /// By default, this method delegates to using `poll_write` on the first 152 /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which 153 /// support vectored IO should override this method. 154 /// 155 /// # Implementation 156 /// 157 /// This function may not return errors of kind `WouldBlock` or 158 /// `Interrupted`. Implementations must convert `WouldBlock` into 159 /// `Poll::Pending` and either internally retry or convert 160 /// `Interrupted` into another error kind. poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>>161 fn poll_write_vectored( 162 self: Pin<&mut Self>, 163 cx: &mut Context<'_>, 164 bufs: &[IoSlice<'_>], 165 ) -> Poll<Result<usize>> { 166 for b in bufs { 167 if !b.is_empty() { 168 return self.poll_write(cx, b); 169 } 170 } 171 172 self.poll_write(cx, &[]) 173 } 174 175 /// Attempt to flush the object, ensuring that any buffered data reach 176 /// their destination. 177 /// 178 /// On success, returns `Poll::Ready(Ok(()))`. 179 /// 180 /// If flushing cannot immediately complete, this method returns 181 /// `Poll::Pending` and arranges for the current task (via 182 /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make 183 /// progress towards flushing. 184 /// 185 /// # Implementation 186 /// 187 /// This function may not return errors of kind `WouldBlock` or 188 /// `Interrupted`. Implementations must convert `WouldBlock` into 189 /// `Poll::Pending` and either internally retry or convert 190 /// `Interrupted` into another error kind. 191 /// 192 /// It only makes sense to do anything here if you actually buffer data. poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>193 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>; 194 195 /// Attempt to close the object. 196 /// 197 /// On success, returns `Poll::Ready(Ok(()))`. 198 /// 199 /// If closing cannot immediately complete, this function returns 200 /// `Poll::Pending` and arranges for the current task (via 201 /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make 202 /// progress towards closing. 203 /// 204 /// # Implementation 205 /// 206 /// This function may not return errors of kind `WouldBlock` or 207 /// `Interrupted`. Implementations must convert `WouldBlock` into 208 /// `Poll::Pending` and either internally retry or convert 209 /// `Interrupted` into another error kind. poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>210 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>; 211 } 212 213 /// Seek bytes asynchronously. 214 /// 215 /// This trait is analogous to the `std::io::Seek` trait, but integrates 216 /// with the asynchronous task system. In particular, the `poll_seek` 217 /// method, unlike `Seek::seek`, will automatically queue the current task 218 /// for wakeup and return if data is not yet available, rather than blocking 219 /// the calling thread. 220 pub trait AsyncSeek { 221 /// Attempt to seek to an offset, in bytes, in a stream. 222 /// 223 /// A seek beyond the end of a stream is allowed, but behavior is defined 224 /// by the implementation. 225 /// 226 /// If the seek operation completed successfully, 227 /// this method returns the new position from the start of the stream. 228 /// That position can be used later with [`SeekFrom::Start`]. 229 /// 230 /// # Errors 231 /// 232 /// Seeking to a negative offset is considered an error. 233 /// 234 /// # Implementation 235 /// 236 /// This function may not return errors of kind `WouldBlock` or 237 /// `Interrupted`. Implementations must convert `WouldBlock` into 238 /// `Poll::Pending` and either internally retry or convert 239 /// `Interrupted` into another error kind. poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>>240 fn poll_seek( 241 self: Pin<&mut Self>, 242 cx: &mut Context<'_>, 243 pos: SeekFrom, 244 ) -> Poll<Result<u64>>; 245 } 246 247 /// Read bytes asynchronously. 248 /// 249 /// This trait is analogous to the `std::io::BufRead` trait, but integrates 250 /// with the asynchronous task system. In particular, the `poll_fill_buf` 251 /// method, unlike `BufRead::fill_buf`, will automatically queue the current task 252 /// for wakeup and return if data is not yet available, rather than blocking 253 /// the calling thread. 254 pub trait AsyncBufRead: AsyncRead { 255 /// Attempt to return the contents of the internal buffer, filling it with more data 256 /// from the inner reader if it is empty. 257 /// 258 /// On success, returns `Poll::Ready(Ok(buf))`. 259 /// 260 /// If no data is available for reading, the method returns 261 /// `Poll::Pending` and arranges for the current task (via 262 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes 263 /// readable or is closed. 264 /// 265 /// This function is a lower-level call. It needs to be paired with the 266 /// [`consume`] method to function properly. When calling this 267 /// method, none of the contents will be "read" in the sense that later 268 /// calling [`poll_read`] may return the same contents. As such, [`consume`] must 269 /// be called with the number of bytes that are consumed from this buffer to 270 /// ensure that the bytes are never returned twice. 271 /// 272 /// [`poll_read`]: AsyncRead::poll_read 273 /// [`consume`]: AsyncBufRead::consume 274 /// 275 /// An empty buffer returned indicates that the stream has reached EOF. 276 /// 277 /// # Implementation 278 /// 279 /// This function may not return errors of kind `WouldBlock` or 280 /// `Interrupted`. Implementations must convert `WouldBlock` into 281 /// `Poll::Pending` and either internally retry or convert 282 /// `Interrupted` into another error kind. poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>283 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>; 284 285 /// Tells this buffer that `amt` bytes have been consumed from the buffer, 286 /// so they should no longer be returned in calls to [`poll_read`]. 287 /// 288 /// This function is a lower-level call. It needs to be paired with the 289 /// [`poll_fill_buf`] method to function properly. This function does 290 /// not perform any I/O, it simply informs this object that some amount of 291 /// its buffer, returned from [`poll_fill_buf`], has been consumed and should 292 /// no longer be returned. As such, this function may do odd things if 293 /// [`poll_fill_buf`] isn't called before calling it. 294 /// 295 /// The `amt` must be `<=` the number of bytes in the buffer returned by 296 /// [`poll_fill_buf`]. 297 /// 298 /// [`poll_read`]: AsyncRead::poll_read 299 /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf consume(self: Pin<&mut Self>, amt: usize)300 fn consume(self: Pin<&mut Self>, amt: usize); 301 } 302 303 macro_rules! deref_async_read { 304 () => { 305 fn poll_read( 306 mut self: Pin<&mut Self>, 307 cx: &mut Context<'_>, 308 buf: &mut [u8], 309 ) -> Poll<Result<usize>> { 310 Pin::new(&mut **self).poll_read(cx, buf) 311 } 312 313 fn poll_read_vectored( 314 mut self: Pin<&mut Self>, 315 cx: &mut Context<'_>, 316 bufs: &mut [IoSliceMut<'_>], 317 ) -> Poll<Result<usize>> { 318 Pin::new(&mut **self).poll_read_vectored(cx, bufs) 319 } 320 }; 321 } 322 323 impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for Box<T> { 324 deref_async_read!(); 325 } 326 327 impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for &mut T { 328 deref_async_read!(); 329 } 330 331 impl<P> AsyncRead for Pin<P> 332 where 333 P: DerefMut + Unpin, 334 P::Target: AsyncRead, 335 { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>>336 fn poll_read( 337 self: Pin<&mut Self>, 338 cx: &mut Context<'_>, 339 buf: &mut [u8], 340 ) -> Poll<Result<usize>> { 341 self.get_mut().as_mut().poll_read(cx, buf) 342 } 343 poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>>344 fn poll_read_vectored( 345 self: Pin<&mut Self>, 346 cx: &mut Context<'_>, 347 bufs: &mut [IoSliceMut<'_>], 348 ) -> Poll<Result<usize>> { 349 self.get_mut().as_mut().poll_read_vectored(cx, bufs) 350 } 351 } 352 353 macro_rules! delegate_async_read_to_stdio { 354 () => { 355 fn poll_read( 356 mut self: Pin<&mut Self>, 357 _: &mut Context<'_>, 358 buf: &mut [u8], 359 ) -> Poll<Result<usize>> { 360 Poll::Ready(io::Read::read(&mut *self, buf)) 361 } 362 363 fn poll_read_vectored( 364 mut self: Pin<&mut Self>, 365 _: &mut Context<'_>, 366 bufs: &mut [IoSliceMut<'_>], 367 ) -> Poll<Result<usize>> { 368 Poll::Ready(io::Read::read_vectored(&mut *self, bufs)) 369 } 370 }; 371 } 372 373 impl AsyncRead for &[u8] { 374 delegate_async_read_to_stdio!(); 375 } 376 377 macro_rules! deref_async_write { 378 () => { 379 fn poll_write( 380 mut self: Pin<&mut Self>, 381 cx: &mut Context<'_>, 382 buf: &[u8], 383 ) -> Poll<Result<usize>> { 384 Pin::new(&mut **self).poll_write(cx, buf) 385 } 386 387 fn poll_write_vectored( 388 mut self: Pin<&mut Self>, 389 cx: &mut Context<'_>, 390 bufs: &[IoSlice<'_>], 391 ) -> Poll<Result<usize>> { 392 Pin::new(&mut **self).poll_write_vectored(cx, bufs) 393 } 394 395 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 396 Pin::new(&mut **self).poll_flush(cx) 397 } 398 399 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 400 Pin::new(&mut **self).poll_close(cx) 401 } 402 }; 403 } 404 405 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> { 406 deref_async_write!(); 407 } 408 409 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T { 410 deref_async_write!(); 411 } 412 413 impl<P> AsyncWrite for Pin<P> 414 where 415 P: DerefMut + Unpin, 416 P::Target: AsyncWrite, 417 { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>>418 fn poll_write( 419 self: Pin<&mut Self>, 420 cx: &mut Context<'_>, 421 buf: &[u8], 422 ) -> Poll<Result<usize>> { 423 self.get_mut().as_mut().poll_write(cx, buf) 424 } 425 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>>426 fn poll_write_vectored( 427 self: Pin<&mut Self>, 428 cx: &mut Context<'_>, 429 bufs: &[IoSlice<'_>], 430 ) -> Poll<Result<usize>> { 431 self.get_mut().as_mut().poll_write_vectored(cx, bufs) 432 } 433 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>434 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 435 self.get_mut().as_mut().poll_flush(cx) 436 } 437 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>438 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 439 self.get_mut().as_mut().poll_close(cx) 440 } 441 } 442 443 macro_rules! delegate_async_write_to_stdio { 444 () => { 445 fn poll_write( 446 mut self: Pin<&mut Self>, 447 _: &mut Context<'_>, 448 buf: &[u8], 449 ) -> Poll<Result<usize>> { 450 Poll::Ready(io::Write::write(&mut *self, buf)) 451 } 452 453 fn poll_write_vectored( 454 mut self: Pin<&mut Self>, 455 _: &mut Context<'_>, 456 bufs: &[IoSlice<'_>], 457 ) -> Poll<Result<usize>> { 458 Poll::Ready(io::Write::write_vectored(&mut *self, bufs)) 459 } 460 461 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> { 462 Poll::Ready(io::Write::flush(&mut *self)) 463 } 464 465 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 466 self.poll_flush(cx) 467 } 468 }; 469 } 470 471 impl AsyncWrite for Vec<u8> { 472 delegate_async_write_to_stdio!(); 473 } 474 475 macro_rules! deref_async_seek { 476 () => { 477 fn poll_seek( 478 mut self: Pin<&mut Self>, 479 cx: &mut Context<'_>, 480 pos: SeekFrom, 481 ) -> Poll<Result<u64>> { 482 Pin::new(&mut **self).poll_seek(cx, pos) 483 } 484 }; 485 } 486 487 impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> { 488 deref_async_seek!(); 489 } 490 491 impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T { 492 deref_async_seek!(); 493 } 494 495 impl<P> AsyncSeek for Pin<P> 496 where 497 P: DerefMut + Unpin, 498 P::Target: AsyncSeek, 499 { poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>>500 fn poll_seek( 501 self: Pin<&mut Self>, 502 cx: &mut Context<'_>, 503 pos: SeekFrom, 504 ) -> Poll<Result<u64>> { 505 self.get_mut().as_mut().poll_seek(cx, pos) 506 } 507 } 508 509 macro_rules! deref_async_buf_read { 510 () => { 511 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { 512 Pin::new(&mut **self.get_mut()).poll_fill_buf(cx) 513 } 514 515 fn consume(mut self: Pin<&mut Self>, amt: usize) { 516 Pin::new(&mut **self).consume(amt) 517 } 518 }; 519 } 520 521 impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> { 522 deref_async_buf_read!(); 523 } 524 525 impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T { 526 deref_async_buf_read!(); 527 } 528 529 impl<P> AsyncBufRead for Pin<P> 530 where 531 P: DerefMut + Unpin, 532 P::Target: AsyncBufRead, 533 { poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>534 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { 535 self.get_mut().as_mut().poll_fill_buf(cx) 536 } 537 consume(self: Pin<&mut Self>, amt: usize)538 fn consume(self: Pin<&mut Self>, amt: usize) { 539 self.get_mut().as_mut().consume(amt) 540 } 541 } 542 543 macro_rules! delegate_async_buf_read_to_stdio { 544 () => { 545 fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> { 546 Poll::Ready(io::BufRead::fill_buf(self.get_mut())) 547 } 548 549 fn consume(self: Pin<&mut Self>, amt: usize) { 550 io::BufRead::consume(self.get_mut(), amt) 551 } 552 }; 553 } 554 555 impl AsyncBufRead for &[u8] { 556 delegate_async_buf_read_to_stdio!(); 557 } 558 } 559 560 #[cfg(feature = "std")] 561 pub use self::if_std::*; 562