1 use crate::io::{Interest, Ready}; 2 use crate::runtime::io::{ReadyEvent, Registration}; 3 use crate::runtime::scheduler; 4 5 use mio::unix::SourceFd; 6 use std::error::Error; 7 use std::fmt; 8 use std::io; 9 use std::os::unix::io::{AsRawFd, RawFd}; 10 use std::task::{ready, Context, Poll}; 11 12 /// Associates an IO object backed by a Unix file descriptor with the tokio 13 /// reactor, allowing for readiness to be polled. The file descriptor must be of 14 /// a type that can be used with the OS polling facilities (ie, `poll`, `epoll`, 15 /// `kqueue`, etc), such as a network socket or pipe, and the file descriptor 16 /// must have the nonblocking mode set to true. 17 /// 18 /// Creating an [`AsyncFd`] registers the file descriptor with the current tokio 19 /// Reactor, allowing you to directly await the file descriptor being readable 20 /// or writable. Once registered, the file descriptor remains registered until 21 /// the [`AsyncFd`] is dropped. 22 /// 23 /// The [`AsyncFd`] takes ownership of an arbitrary object to represent the IO 24 /// object. It is intended that the inner object will handle closing the file 25 /// descriptor when it is dropped, avoiding resource leaks and ensuring that the 26 /// [`AsyncFd`] can clean up the registration before closing the file descriptor. 27 /// The [`AsyncFd::into_inner`] function can be used to extract the inner object 28 /// to retake control from the tokio IO reactor. The [`OwnedFd`] type is often 29 /// used as the inner object, as it is the simplest type that closes the fd on 30 /// drop. 31 /// 32 /// The inner object is required to implement [`AsRawFd`]. This file descriptor 33 /// must not change while [`AsyncFd`] owns the inner object, i.e. the 34 /// [`AsRawFd::as_raw_fd`] method on the inner type must always return the same 35 /// file descriptor when called multiple times. Failure to uphold this results 36 /// in unspecified behavior in the IO driver, which may include breaking 37 /// notifications for other sockets/etc. 38 /// 39 /// Polling for readiness is done by calling the async functions [`readable`] 40 /// and [`writable`]. These functions complete when the associated readiness 41 /// condition is observed. Any number of tasks can query the same `AsyncFd` in 42 /// parallel, on the same or different conditions. 43 /// 44 /// On some platforms, the readiness detecting mechanism relies on 45 /// edge-triggered notifications. This means that the OS will only notify Tokio 46 /// when the file descriptor transitions from not-ready to ready. For this to 47 /// work you should first try to read or write and only poll for readiness 48 /// if that fails with an error of [`std::io::ErrorKind::WouldBlock`]. 49 /// 50 /// Tokio internally tracks when it has received a ready notification, and when 51 /// readiness checking functions like [`readable`] and [`writable`] are called, 52 /// if the readiness flag is set, these async functions will complete 53 /// immediately. This however does mean that it is critical to ensure that this 54 /// ready flag is cleared when (and only when) the file descriptor ceases to be 55 /// ready. The [`AsyncFdReadyGuard`] returned from readiness checking functions 56 /// serves this function; after calling a readiness-checking async function, 57 /// you must use this [`AsyncFdReadyGuard`] to signal to tokio whether the file 58 /// descriptor is no longer in a ready state. 59 /// 60 /// ## Use with to a poll-based API 61 /// 62 /// In some cases it may be desirable to use `AsyncFd` from APIs similar to 63 /// [`TcpStream::poll_read_ready`]. The [`AsyncFd::poll_read_ready`] and 64 /// [`AsyncFd::poll_write_ready`] functions are provided for this purpose. 65 /// Because these functions don't create a future to hold their state, they have 66 /// the limitation that only one task can wait on each direction (read or write) 67 /// at a time. 68 /// 69 /// # Examples 70 /// 71 /// This example shows how to turn [`std::net::TcpStream`] asynchronous using 72 /// `AsyncFd`. It implements the read/write operations both as an `async fn` 73 /// and using the IO traits [`AsyncRead`] and [`AsyncWrite`]. 74 /// 75 /// ```no_run 76 /// use std::io::{self, Read, Write}; 77 /// use std::net::TcpStream; 78 /// use std::pin::Pin; 79 /// use std::task::{ready, Context, Poll}; 80 /// use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; 81 /// use tokio::io::unix::AsyncFd; 82 /// 83 /// pub struct AsyncTcpStream { 84 /// inner: AsyncFd<TcpStream>, 85 /// } 86 /// 87 /// impl AsyncTcpStream { 88 /// pub fn new(tcp: TcpStream) -> io::Result<Self> { 89 /// tcp.set_nonblocking(true)?; 90 /// Ok(Self { 91 /// inner: AsyncFd::new(tcp)?, 92 /// }) 93 /// } 94 /// 95 /// pub async fn read(&self, out: &mut [u8]) -> io::Result<usize> { 96 /// loop { 97 /// let mut guard = self.inner.readable().await?; 98 /// 99 /// match guard.try_io(|inner| inner.get_ref().read(out)) { 100 /// Ok(result) => return result, 101 /// Err(_would_block) => continue, 102 /// } 103 /// } 104 /// } 105 /// 106 /// pub async fn write(&self, buf: &[u8]) -> io::Result<usize> { 107 /// loop { 108 /// let mut guard = self.inner.writable().await?; 109 /// 110 /// match guard.try_io(|inner| inner.get_ref().write(buf)) { 111 /// Ok(result) => return result, 112 /// Err(_would_block) => continue, 113 /// } 114 /// } 115 /// } 116 /// } 117 /// 118 /// impl AsyncRead for AsyncTcpStream { 119 /// fn poll_read( 120 /// self: Pin<&mut Self>, 121 /// cx: &mut Context<'_>, 122 /// buf: &mut ReadBuf<'_> 123 /// ) -> Poll<io::Result<()>> { 124 /// loop { 125 /// let mut guard = ready!(self.inner.poll_read_ready(cx))?; 126 /// 127 /// let unfilled = buf.initialize_unfilled(); 128 /// match guard.try_io(|inner| inner.get_ref().read(unfilled)) { 129 /// Ok(Ok(len)) => { 130 /// buf.advance(len); 131 /// return Poll::Ready(Ok(())); 132 /// }, 133 /// Ok(Err(err)) => return Poll::Ready(Err(err)), 134 /// Err(_would_block) => continue, 135 /// } 136 /// } 137 /// } 138 /// } 139 /// 140 /// impl AsyncWrite for AsyncTcpStream { 141 /// fn poll_write( 142 /// self: Pin<&mut Self>, 143 /// cx: &mut Context<'_>, 144 /// buf: &[u8] 145 /// ) -> Poll<io::Result<usize>> { 146 /// loop { 147 /// let mut guard = ready!(self.inner.poll_write_ready(cx))?; 148 /// 149 /// match guard.try_io(|inner| inner.get_ref().write(buf)) { 150 /// Ok(result) => return Poll::Ready(result), 151 /// Err(_would_block) => continue, 152 /// } 153 /// } 154 /// } 155 /// 156 /// fn poll_flush( 157 /// self: Pin<&mut Self>, 158 /// cx: &mut Context<'_>, 159 /// ) -> Poll<io::Result<()>> { 160 /// // tcp flush is a no-op 161 /// Poll::Ready(Ok(())) 162 /// } 163 /// 164 /// fn poll_shutdown( 165 /// self: Pin<&mut Self>, 166 /// cx: &mut Context<'_>, 167 /// ) -> Poll<io::Result<()>> { 168 /// self.inner.get_ref().shutdown(std::net::Shutdown::Write)?; 169 /// Poll::Ready(Ok(())) 170 /// } 171 /// } 172 /// ``` 173 /// 174 /// [`readable`]: method@Self::readable 175 /// [`writable`]: method@Self::writable 176 /// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard 177 /// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream 178 /// [`AsyncRead`]: trait@crate::io::AsyncRead 179 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite 180 /// [`OwnedFd`]: struct@std::os::fd::OwnedFd 181 pub struct AsyncFd<T: AsRawFd> { 182 registration: Registration, 183 // The inner value is always present. the Option is required for `drop` and `into_inner`. 184 // In all other methods `unwrap` is valid, and will never panic. 185 inner: Option<T>, 186 } 187 188 /// Represents an IO-ready event detected on a particular file descriptor that 189 /// has not yet been acknowledged. This is a `must_use` structure to help ensure 190 /// that you do not forget to explicitly clear (or not clear) the event. 191 /// 192 /// This type exposes an immutable reference to the underlying IO object. 193 #[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"] 194 pub struct AsyncFdReadyGuard<'a, T: AsRawFd> { 195 async_fd: &'a AsyncFd<T>, 196 event: Option<ReadyEvent>, 197 } 198 199 /// Represents an IO-ready event detected on a particular file descriptor that 200 /// has not yet been acknowledged. This is a `must_use` structure to help ensure 201 /// that you do not forget to explicitly clear (or not clear) the event. 202 /// 203 /// This type exposes a mutable reference to the underlying IO object. 204 #[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"] 205 pub struct AsyncFdReadyMutGuard<'a, T: AsRawFd> { 206 async_fd: &'a mut AsyncFd<T>, 207 event: Option<ReadyEvent>, 208 } 209 210 impl<T: AsRawFd> AsyncFd<T> { 211 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object 212 /// implementing [`AsRawFd`]. The backing file descriptor is cached at the 213 /// time of creation. 214 /// 215 /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more 216 /// control, use [`AsyncFd::with_interest`]. 217 /// 218 /// This method must be called in the context of a tokio runtime. 219 /// 220 /// # Panics 221 /// 222 /// This function panics if there is no current reactor set, or if the `rt` 223 /// feature flag is not enabled. 224 #[inline] 225 #[track_caller] new(inner: T) -> io::Result<Self> where T: AsRawFd,226 pub fn new(inner: T) -> io::Result<Self> 227 where 228 T: AsRawFd, 229 { 230 Self::with_interest(inner, Interest::READABLE | Interest::WRITABLE) 231 } 232 233 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object 234 /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing 235 /// file descriptor is cached at the time of creation. 236 /// 237 /// # Panics 238 /// 239 /// This function panics if there is no current reactor set, or if the `rt` 240 /// feature flag is not enabled. 241 #[inline] 242 #[track_caller] with_interest(inner: T, interest: Interest) -> io::Result<Self> where T: AsRawFd,243 pub fn with_interest(inner: T, interest: Interest) -> io::Result<Self> 244 where 245 T: AsRawFd, 246 { 247 Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest) 248 } 249 250 #[track_caller] new_with_handle_and_interest( inner: T, handle: scheduler::Handle, interest: Interest, ) -> io::Result<Self>251 pub(crate) fn new_with_handle_and_interest( 252 inner: T, 253 handle: scheduler::Handle, 254 interest: Interest, 255 ) -> io::Result<Self> { 256 Self::try_new_with_handle_and_interest(inner, handle, interest).map_err(Into::into) 257 } 258 259 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object 260 /// implementing [`AsRawFd`]. The backing file descriptor is cached at the 261 /// time of creation. 262 /// 263 /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more 264 /// control, use [`AsyncFd::try_with_interest`]. 265 /// 266 /// This method must be called in the context of a tokio runtime. 267 /// 268 /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object 269 /// passed to this function. 270 /// 271 /// # Panics 272 /// 273 /// This function panics if there is no current reactor set, or if the `rt` 274 /// feature flag is not enabled. 275 #[inline] 276 #[track_caller] try_new(inner: T) -> Result<Self, AsyncFdTryNewError<T>> where T: AsRawFd,277 pub fn try_new(inner: T) -> Result<Self, AsyncFdTryNewError<T>> 278 where 279 T: AsRawFd, 280 { 281 Self::try_with_interest(inner, Interest::READABLE | Interest::WRITABLE) 282 } 283 284 /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object 285 /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing 286 /// file descriptor is cached at the time of creation. 287 /// 288 /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object 289 /// passed to this function. 290 /// 291 /// # Panics 292 /// 293 /// This function panics if there is no current reactor set, or if the `rt` 294 /// feature flag is not enabled. 295 #[inline] 296 #[track_caller] try_with_interest(inner: T, interest: Interest) -> Result<Self, AsyncFdTryNewError<T>> where T: AsRawFd,297 pub fn try_with_interest(inner: T, interest: Interest) -> Result<Self, AsyncFdTryNewError<T>> 298 where 299 T: AsRawFd, 300 { 301 Self::try_new_with_handle_and_interest(inner, scheduler::Handle::current(), interest) 302 } 303 304 #[track_caller] try_new_with_handle_and_interest( inner: T, handle: scheduler::Handle, interest: Interest, ) -> Result<Self, AsyncFdTryNewError<T>>305 pub(crate) fn try_new_with_handle_and_interest( 306 inner: T, 307 handle: scheduler::Handle, 308 interest: Interest, 309 ) -> Result<Self, AsyncFdTryNewError<T>> { 310 let fd = inner.as_raw_fd(); 311 312 match Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle) { 313 Ok(registration) => Ok(AsyncFd { 314 registration, 315 inner: Some(inner), 316 }), 317 Err(cause) => Err(AsyncFdTryNewError { inner, cause }), 318 } 319 } 320 321 /// Returns a shared reference to the backing object of this [`AsyncFd`]. 322 #[inline] get_ref(&self) -> &T323 pub fn get_ref(&self) -> &T { 324 self.inner.as_ref().unwrap() 325 } 326 327 /// Returns a mutable reference to the backing object of this [`AsyncFd`]. 328 #[inline] get_mut(&mut self) -> &mut T329 pub fn get_mut(&mut self) -> &mut T { 330 self.inner.as_mut().unwrap() 331 } 332 take_inner(&mut self) -> Option<T>333 fn take_inner(&mut self) -> Option<T> { 334 let inner = self.inner.take()?; 335 let fd = inner.as_raw_fd(); 336 337 let _ = self.registration.deregister(&mut SourceFd(&fd)); 338 339 Some(inner) 340 } 341 342 /// Deregisters this file descriptor and returns ownership of the backing 343 /// object. into_inner(mut self) -> T344 pub fn into_inner(mut self) -> T { 345 self.take_inner().unwrap() 346 } 347 348 /// Polls for read readiness. 349 /// 350 /// If the file descriptor is not currently ready for reading, this method 351 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the 352 /// file descriptor becomes ready for reading, [`Waker::wake`] will be called. 353 /// 354 /// Note that on multiple calls to [`poll_read_ready`] or 355 /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the 356 /// most recent call is scheduled to receive a wakeup. (However, 357 /// [`poll_write_ready`] retains a second, independent waker). 358 /// 359 /// This method is intended for cases where creating and pinning a future 360 /// via [`readable`] is not feasible. Where possible, using [`readable`] is 361 /// preferred, as this supports polling from multiple tasks at once. 362 /// 363 /// This method takes `&self`, so it is possible to call this method 364 /// concurrently with other methods on this struct. This method only 365 /// provides shared access to the inner IO resource when handling the 366 /// [`AsyncFdReadyGuard`]. 367 /// 368 /// [`poll_read_ready`]: method@Self::poll_read_ready 369 /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut 370 /// [`poll_write_ready`]: method@Self::poll_write_ready 371 /// [`readable`]: method@Self::readable 372 /// [`Context`]: struct@std::task::Context 373 /// [`Waker`]: struct@std::task::Waker 374 /// [`Waker::wake`]: method@std::task::Waker::wake poll_read_ready<'a>( &'a self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>>375 pub fn poll_read_ready<'a>( 376 &'a self, 377 cx: &mut Context<'_>, 378 ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> { 379 let event = ready!(self.registration.poll_read_ready(cx))?; 380 381 Poll::Ready(Ok(AsyncFdReadyGuard { 382 async_fd: self, 383 event: Some(event), 384 })) 385 } 386 387 /// Polls for read readiness. 388 /// 389 /// If the file descriptor is not currently ready for reading, this method 390 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the 391 /// file descriptor becomes ready for reading, [`Waker::wake`] will be called. 392 /// 393 /// Note that on multiple calls to [`poll_read_ready`] or 394 /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the 395 /// most recent call is scheduled to receive a wakeup. (However, 396 /// [`poll_write_ready`] retains a second, independent waker). 397 /// 398 /// This method is intended for cases where creating and pinning a future 399 /// via [`readable`] is not feasible. Where possible, using [`readable`] is 400 /// preferred, as this supports polling from multiple tasks at once. 401 /// 402 /// This method takes `&mut self`, so it is possible to access the inner IO 403 /// resource mutably when handling the [`AsyncFdReadyMutGuard`]. 404 /// 405 /// [`poll_read_ready`]: method@Self::poll_read_ready 406 /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut 407 /// [`poll_write_ready`]: method@Self::poll_write_ready 408 /// [`readable`]: method@Self::readable 409 /// [`Context`]: struct@std::task::Context 410 /// [`Waker`]: struct@std::task::Waker 411 /// [`Waker::wake`]: method@std::task::Waker::wake poll_read_ready_mut<'a>( &'a mut self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>>412 pub fn poll_read_ready_mut<'a>( 413 &'a mut self, 414 cx: &mut Context<'_>, 415 ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> { 416 let event = ready!(self.registration.poll_read_ready(cx))?; 417 418 Poll::Ready(Ok(AsyncFdReadyMutGuard { 419 async_fd: self, 420 event: Some(event), 421 })) 422 } 423 424 /// Polls for write readiness. 425 /// 426 /// If the file descriptor is not currently ready for writing, this method 427 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the 428 /// file descriptor becomes ready for writing, [`Waker::wake`] will be called. 429 /// 430 /// Note that on multiple calls to [`poll_write_ready`] or 431 /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the 432 /// most recent call is scheduled to receive a wakeup. (However, 433 /// [`poll_read_ready`] retains a second, independent waker). 434 /// 435 /// This method is intended for cases where creating and pinning a future 436 /// via [`writable`] is not feasible. Where possible, using [`writable`] is 437 /// preferred, as this supports polling from multiple tasks at once. 438 /// 439 /// This method takes `&self`, so it is possible to call this method 440 /// concurrently with other methods on this struct. This method only 441 /// provides shared access to the inner IO resource when handling the 442 /// [`AsyncFdReadyGuard`]. 443 /// 444 /// [`poll_read_ready`]: method@Self::poll_read_ready 445 /// [`poll_write_ready`]: method@Self::poll_write_ready 446 /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut 447 /// [`writable`]: method@Self::readable 448 /// [`Context`]: struct@std::task::Context 449 /// [`Waker`]: struct@std::task::Waker 450 /// [`Waker::wake`]: method@std::task::Waker::wake poll_write_ready<'a>( &'a self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>>451 pub fn poll_write_ready<'a>( 452 &'a self, 453 cx: &mut Context<'_>, 454 ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> { 455 let event = ready!(self.registration.poll_write_ready(cx))?; 456 457 Poll::Ready(Ok(AsyncFdReadyGuard { 458 async_fd: self, 459 event: Some(event), 460 })) 461 } 462 463 /// Polls for write readiness. 464 /// 465 /// If the file descriptor is not currently ready for writing, this method 466 /// will store a clone of the [`Waker`] from the provided [`Context`]. When the 467 /// file descriptor becomes ready for writing, [`Waker::wake`] will be called. 468 /// 469 /// Note that on multiple calls to [`poll_write_ready`] or 470 /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the 471 /// most recent call is scheduled to receive a wakeup. (However, 472 /// [`poll_read_ready`] retains a second, independent waker). 473 /// 474 /// This method is intended for cases where creating and pinning a future 475 /// via [`writable`] is not feasible. Where possible, using [`writable`] is 476 /// preferred, as this supports polling from multiple tasks at once. 477 /// 478 /// This method takes `&mut self`, so it is possible to access the inner IO 479 /// resource mutably when handling the [`AsyncFdReadyMutGuard`]. 480 /// 481 /// [`poll_read_ready`]: method@Self::poll_read_ready 482 /// [`poll_write_ready`]: method@Self::poll_write_ready 483 /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut 484 /// [`writable`]: method@Self::readable 485 /// [`Context`]: struct@std::task::Context 486 /// [`Waker`]: struct@std::task::Waker 487 /// [`Waker::wake`]: method@std::task::Waker::wake poll_write_ready_mut<'a>( &'a mut self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>>488 pub fn poll_write_ready_mut<'a>( 489 &'a mut self, 490 cx: &mut Context<'_>, 491 ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> { 492 let event = ready!(self.registration.poll_write_ready(cx))?; 493 494 Poll::Ready(Ok(AsyncFdReadyMutGuard { 495 async_fd: self, 496 event: Some(event), 497 })) 498 } 499 500 /// Waits for any of the requested ready states, returning a 501 /// [`AsyncFdReadyGuard`] that must be dropped to resume 502 /// polling for the requested ready states. 503 /// 504 /// The function may complete without the file descriptor being ready. This is a 505 /// false-positive and attempting an operation will return with 506 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty 507 /// [`Ready`] set, so you should always check the returned value and possibly 508 /// wait again if the requested states are not set. 509 /// 510 /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared. 511 /// When a combined interest is used, it is important to clear only the readiness 512 /// that is actually observed to block. For instance when the combined 513 /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only 514 /// read readiness should be cleared using the [`AsyncFdReadyGuard::clear_ready_matching`] method: 515 /// `guard.clear_ready_matching(Ready::READABLE)`. 516 /// Also clearing the write readiness in this case would be incorrect. The [`AsyncFdReadyGuard::clear_ready`] 517 /// method clears all readiness flags. 518 /// 519 /// This method takes `&self`, so it is possible to call this method 520 /// concurrently with other methods on this struct. This method only 521 /// provides shared access to the inner IO resource when handling the 522 /// [`AsyncFdReadyGuard`]. 523 /// 524 /// # Examples 525 /// 526 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without 527 /// splitting. 528 /// 529 /// ```no_run 530 /// use std::error::Error; 531 /// use std::io; 532 /// use std::io::{Read, Write}; 533 /// use std::net::TcpStream; 534 /// use tokio::io::unix::AsyncFd; 535 /// use tokio::io::{Interest, Ready}; 536 /// 537 /// #[tokio::main] 538 /// async fn main() -> Result<(), Box<dyn Error>> { 539 /// let stream = TcpStream::connect("127.0.0.1:8080")?; 540 /// stream.set_nonblocking(true)?; 541 /// let stream = AsyncFd::new(stream)?; 542 /// 543 /// loop { 544 /// let mut guard = stream 545 /// .ready(Interest::READABLE | Interest::WRITABLE) 546 /// .await?; 547 /// 548 /// if guard.ready().is_readable() { 549 /// let mut data = vec![0; 1024]; 550 /// // Try to read data, this may still fail with `WouldBlock` 551 /// // if the readiness event is a false positive. 552 /// match stream.get_ref().read(&mut data) { 553 /// Ok(n) => { 554 /// println!("read {} bytes", n); 555 /// } 556 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 557 /// // a read has blocked, but a write might still succeed. 558 /// // clear only the read readiness. 559 /// guard.clear_ready_matching(Ready::READABLE); 560 /// continue; 561 /// } 562 /// Err(e) => { 563 /// return Err(e.into()); 564 /// } 565 /// } 566 /// } 567 /// 568 /// if guard.ready().is_writable() { 569 /// // Try to write data, this may still fail with `WouldBlock` 570 /// // if the readiness event is a false positive. 571 /// match stream.get_ref().write(b"hello world") { 572 /// Ok(n) => { 573 /// println!("write {} bytes", n); 574 /// } 575 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 576 /// // a write has blocked, but a read might still succeed. 577 /// // clear only the write readiness. 578 /// guard.clear_ready_matching(Ready::WRITABLE); 579 /// continue; 580 /// } 581 /// Err(e) => { 582 /// return Err(e.into()); 583 /// } 584 /// } 585 /// } 586 /// } 587 /// } 588 /// ``` ready(&self, interest: Interest) -> io::Result<AsyncFdReadyGuard<'_, T>>589 pub async fn ready(&self, interest: Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> { 590 let event = self.registration.readiness(interest).await?; 591 592 Ok(AsyncFdReadyGuard { 593 async_fd: self, 594 event: Some(event), 595 }) 596 } 597 598 /// Waits for any of the requested ready states, returning a 599 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume 600 /// polling for the requested ready states. 601 /// 602 /// The function may complete without the file descriptor being ready. This is a 603 /// false-positive and attempting an operation will return with 604 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty 605 /// [`Ready`] set, so you should always check the returned value and possibly 606 /// wait again if the requested states are not set. 607 /// 608 /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared. 609 /// When a combined interest is used, it is important to clear only the readiness 610 /// that is actually observed to block. For instance when the combined 611 /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only 612 /// read readiness should be cleared using the [`AsyncFdReadyMutGuard::clear_ready_matching`] method: 613 /// `guard.clear_ready_matching(Ready::READABLE)`. 614 /// Also clearing the write readiness in this case would be incorrect. 615 /// The [`AsyncFdReadyMutGuard::clear_ready`] method clears all readiness flags. 616 /// 617 /// This method takes `&mut self`, so it is possible to access the inner IO 618 /// resource mutably when handling the [`AsyncFdReadyMutGuard`]. 619 /// 620 /// # Examples 621 /// 622 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without 623 /// splitting. 624 /// 625 /// ```no_run 626 /// use std::error::Error; 627 /// use std::io; 628 /// use std::io::{Read, Write}; 629 /// use std::net::TcpStream; 630 /// use tokio::io::unix::AsyncFd; 631 /// use tokio::io::{Interest, Ready}; 632 /// 633 /// #[tokio::main] 634 /// async fn main() -> Result<(), Box<dyn Error>> { 635 /// let stream = TcpStream::connect("127.0.0.1:8080")?; 636 /// stream.set_nonblocking(true)?; 637 /// let mut stream = AsyncFd::new(stream)?; 638 /// 639 /// loop { 640 /// let mut guard = stream 641 /// .ready_mut(Interest::READABLE | Interest::WRITABLE) 642 /// .await?; 643 /// 644 /// if guard.ready().is_readable() { 645 /// let mut data = vec![0; 1024]; 646 /// // Try to read data, this may still fail with `WouldBlock` 647 /// // if the readiness event is a false positive. 648 /// match guard.get_inner_mut().read(&mut data) { 649 /// Ok(n) => { 650 /// println!("read {} bytes", n); 651 /// } 652 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 653 /// // a read has blocked, but a write might still succeed. 654 /// // clear only the read readiness. 655 /// guard.clear_ready_matching(Ready::READABLE); 656 /// continue; 657 /// } 658 /// Err(e) => { 659 /// return Err(e.into()); 660 /// } 661 /// } 662 /// } 663 /// 664 /// if guard.ready().is_writable() { 665 /// // Try to write data, this may still fail with `WouldBlock` 666 /// // if the readiness event is a false positive. 667 /// match guard.get_inner_mut().write(b"hello world") { 668 /// Ok(n) => { 669 /// println!("write {} bytes", n); 670 /// } 671 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 672 /// // a write has blocked, but a read might still succeed. 673 /// // clear only the write readiness. 674 /// guard.clear_ready_matching(Ready::WRITABLE); 675 /// continue; 676 /// } 677 /// Err(e) => { 678 /// return Err(e.into()); 679 /// } 680 /// } 681 /// } 682 /// } 683 /// } 684 /// ``` ready_mut( &mut self, interest: Interest, ) -> io::Result<AsyncFdReadyMutGuard<'_, T>>685 pub async fn ready_mut( 686 &mut self, 687 interest: Interest, 688 ) -> io::Result<AsyncFdReadyMutGuard<'_, T>> { 689 let event = self.registration.readiness(interest).await?; 690 691 Ok(AsyncFdReadyMutGuard { 692 async_fd: self, 693 event: Some(event), 694 }) 695 } 696 697 /// Waits for the file descriptor to become readable, returning a 698 /// [`AsyncFdReadyGuard`] that must be dropped to resume read-readiness 699 /// polling. 700 /// 701 /// This method takes `&self`, so it is possible to call this method 702 /// concurrently with other methods on this struct. This method only 703 /// provides shared access to the inner IO resource when handling the 704 /// [`AsyncFdReadyGuard`]. 705 /// 706 /// # Cancel safety 707 /// 708 /// This method is cancel safe. Once a readiness event occurs, the method 709 /// will continue to return immediately until the readiness event is 710 /// consumed by an attempt to read or write that fails with `WouldBlock` or 711 /// `Poll::Pending`. 712 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering. readable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>>713 pub async fn readable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> { 714 self.ready(Interest::READABLE).await 715 } 716 717 /// Waits for the file descriptor to become readable, returning a 718 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume read-readiness 719 /// polling. 720 /// 721 /// This method takes `&mut self`, so it is possible to access the inner IO 722 /// resource mutably when handling the [`AsyncFdReadyMutGuard`]. 723 /// 724 /// # Cancel safety 725 /// 726 /// This method is cancel safe. Once a readiness event occurs, the method 727 /// will continue to return immediately until the readiness event is 728 /// consumed by an attempt to read or write that fails with `WouldBlock` or 729 /// `Poll::Pending`. 730 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering. readable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>>731 pub async fn readable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> { 732 self.ready_mut(Interest::READABLE).await 733 } 734 735 /// Waits for the file descriptor to become writable, returning a 736 /// [`AsyncFdReadyGuard`] that must be dropped to resume write-readiness 737 /// polling. 738 /// 739 /// This method takes `&self`, so it is possible to call this method 740 /// concurrently with other methods on this struct. This method only 741 /// provides shared access to the inner IO resource when handling the 742 /// [`AsyncFdReadyGuard`]. 743 /// 744 /// # Cancel safety 745 /// 746 /// This method is cancel safe. Once a readiness event occurs, the method 747 /// will continue to return immediately until the readiness event is 748 /// consumed by an attempt to read or write that fails with `WouldBlock` or 749 /// `Poll::Pending`. 750 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering. writable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>>751 pub async fn writable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> { 752 self.ready(Interest::WRITABLE).await 753 } 754 755 /// Waits for the file descriptor to become writable, returning a 756 /// [`AsyncFdReadyMutGuard`] that must be dropped to resume write-readiness 757 /// polling. 758 /// 759 /// This method takes `&mut self`, so it is possible to access the inner IO 760 /// resource mutably when handling the [`AsyncFdReadyMutGuard`]. 761 /// 762 /// # Cancel safety 763 /// 764 /// This method is cancel safe. Once a readiness event occurs, the method 765 /// will continue to return immediately until the readiness event is 766 /// consumed by an attempt to read or write that fails with `WouldBlock` or 767 /// `Poll::Pending`. 768 #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering. writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>>769 pub async fn writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> { 770 self.ready_mut(Interest::WRITABLE).await 771 } 772 773 /// Reads or writes from the file descriptor using a user-provided IO operation. 774 /// 775 /// The `async_io` method is a convenience utility that waits for the file 776 /// descriptor to become ready, and then executes the provided IO operation. 777 /// Since file descriptors may be marked ready spuriously, the closure will 778 /// be called repeatedly until it returns something other than a 779 /// [`WouldBlock`] error. This is done using the following loop: 780 /// 781 /// ```no_run 782 /// # use std::io::{self, Result}; 783 /// # struct Dox<T> { inner: T } 784 /// # impl<T> Dox<T> { 785 /// # async fn writable(&self) -> Result<&Self> { 786 /// # Ok(self) 787 /// # } 788 /// # fn try_io<R>(&self, _: impl FnMut(&T) -> Result<R>) -> Result<Result<R>> { 789 /// # panic!() 790 /// # } 791 /// async fn async_io<R>(&self, mut f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> { 792 /// loop { 793 /// // or `readable` if called with the read interest. 794 /// let guard = self.writable().await?; 795 /// 796 /// match guard.try_io(&mut f) { 797 /// Ok(result) => return result, 798 /// Err(_would_block) => continue, 799 /// } 800 /// } 801 /// } 802 /// # } 803 /// ``` 804 /// 805 /// The closure should only return a [`WouldBlock`] error if it has performed 806 /// an IO operation on the file descriptor that failed due to the file descriptor not being 807 /// ready. Returning a [`WouldBlock`] error in any other situation will 808 /// incorrectly clear the readiness flag, which can cause the file descriptor to 809 /// behave incorrectly. 810 /// 811 /// The closure should not perform the IO operation using any of the methods 812 /// defined on the Tokio [`AsyncFd`] type, as this will mess with the 813 /// readiness flag and can cause the file descriptor to behave incorrectly. 814 /// 815 /// This method is not intended to be used with combined interests. 816 /// The closure should perform only one type of IO operation, so it should not 817 /// require more than one ready state. This method may panic or sleep forever 818 /// if it is called with a combined interest. 819 /// 820 /// # Examples 821 /// 822 /// This example sends some bytes on the inner [`std::net::UdpSocket`]. The `async_io` 823 /// method waits for readiness, and retries if the send operation does block. This example 824 /// is equivalent to the one given for [`try_io`]. 825 /// 826 /// ```no_run 827 /// use tokio::io::{Interest, unix::AsyncFd}; 828 /// 829 /// use std::io; 830 /// use std::net::UdpSocket; 831 /// 832 /// #[tokio::main] 833 /// async fn main() -> io::Result<()> { 834 /// let socket = UdpSocket::bind("0.0.0.0:8080")?; 835 /// socket.set_nonblocking(true)?; 836 /// let async_fd = AsyncFd::new(socket)?; 837 /// 838 /// let written = async_fd 839 /// .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2])) 840 /// .await?; 841 /// 842 /// println!("wrote {written} bytes"); 843 /// 844 /// Ok(()) 845 /// } 846 /// ``` 847 /// 848 /// [`try_io`]: AsyncFdReadyGuard::try_io 849 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock async_io<R>( &self, interest: Interest, mut f: impl FnMut(&T) -> io::Result<R>, ) -> io::Result<R>850 pub async fn async_io<R>( 851 &self, 852 interest: Interest, 853 mut f: impl FnMut(&T) -> io::Result<R>, 854 ) -> io::Result<R> { 855 self.registration 856 .async_io(interest, || f(self.get_ref())) 857 .await 858 } 859 860 /// Reads or writes from the file descriptor using a user-provided IO operation. 861 /// 862 /// The behavior is the same as [`async_io`], except that the closure can mutate the inner 863 /// value of the [`AsyncFd`]. 864 /// 865 /// [`async_io`]: AsyncFd::async_io async_io_mut<R>( &mut self, interest: Interest, mut f: impl FnMut(&mut T) -> io::Result<R>, ) -> io::Result<R>866 pub async fn async_io_mut<R>( 867 &mut self, 868 interest: Interest, 869 mut f: impl FnMut(&mut T) -> io::Result<R>, 870 ) -> io::Result<R> { 871 self.registration 872 .async_io(interest, || f(self.inner.as_mut().unwrap())) 873 .await 874 } 875 876 /// Tries to read or write from the file descriptor using a user-provided IO operation. 877 /// 878 /// If the file descriptor is ready, the provided closure is called. The closure 879 /// should attempt to perform IO operation on the file descriptor by manually 880 /// calling the appropriate syscall. If the operation fails because the 881 /// file descriptor is not actually ready, then the closure should return a 882 /// `WouldBlock` error and the readiness flag is cleared. The return value 883 /// of the closure is then returned by `try_io`. 884 /// 885 /// If the file descriptor is not ready, then the closure is not called 886 /// and a `WouldBlock` error is returned. 887 /// 888 /// The closure should only return a `WouldBlock` error if it has performed 889 /// an IO operation on the file descriptor that failed due to the file descriptor not being 890 /// ready. Returning a `WouldBlock` error in any other situation will 891 /// incorrectly clear the readiness flag, which can cause the file descriptor to 892 /// behave incorrectly. 893 /// 894 /// The closure should not perform the IO operation using any of the methods 895 /// defined on the Tokio `AsyncFd` type, as this will mess with the 896 /// readiness flag and can cause the file descriptor to behave incorrectly. 897 /// 898 /// This method is not intended to be used with combined interests. 899 /// The closure should perform only one type of IO operation, so it should not 900 /// require more than one ready state. This method may panic or sleep forever 901 /// if it is called with a combined interest. try_io<R>( &self, interest: Interest, f: impl FnOnce(&T) -> io::Result<R>, ) -> io::Result<R>902 pub fn try_io<R>( 903 &self, 904 interest: Interest, 905 f: impl FnOnce(&T) -> io::Result<R>, 906 ) -> io::Result<R> { 907 self.registration 908 .try_io(interest, || f(self.inner.as_ref().unwrap())) 909 } 910 911 /// Tries to read or write from the file descriptor using a user-provided IO operation. 912 /// 913 /// The behavior is the same as [`try_io`], except that the closure can mutate the inner 914 /// value of the [`AsyncFd`]. 915 /// 916 /// [`try_io`]: AsyncFd::try_io try_io_mut<R>( &mut self, interest: Interest, f: impl FnOnce(&mut T) -> io::Result<R>, ) -> io::Result<R>917 pub fn try_io_mut<R>( 918 &mut self, 919 interest: Interest, 920 f: impl FnOnce(&mut T) -> io::Result<R>, 921 ) -> io::Result<R> { 922 self.registration 923 .try_io(interest, || f(self.inner.as_mut().unwrap())) 924 } 925 } 926 927 impl<T: AsRawFd> AsRawFd for AsyncFd<T> { as_raw_fd(&self) -> RawFd928 fn as_raw_fd(&self) -> RawFd { 929 self.inner.as_ref().unwrap().as_raw_fd() 930 } 931 } 932 933 impl<T: AsRawFd> std::os::unix::io::AsFd for AsyncFd<T> { as_fd(&self) -> std::os::unix::io::BorrowedFd<'_>934 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> { 935 unsafe { std::os::unix::io::BorrowedFd::borrow_raw(self.as_raw_fd()) } 936 } 937 } 938 939 impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result940 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 941 f.debug_struct("AsyncFd") 942 .field("inner", &self.inner) 943 .finish() 944 } 945 } 946 947 impl<T: AsRawFd> Drop for AsyncFd<T> { drop(&mut self)948 fn drop(&mut self) { 949 let _ = self.take_inner(); 950 } 951 } 952 953 impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> { 954 /// Indicates to tokio that the file descriptor is no longer ready. All 955 /// internal readiness flags will be cleared, and tokio will wait for the 956 /// next edge-triggered readiness notification from the OS. 957 /// 958 /// This function is commonly used with guards returned by [`AsyncFd::readable`] and 959 /// [`AsyncFd::writable`]. 960 /// 961 /// It is critical that this function not be called unless your code 962 /// _actually observes_ that the file descriptor is _not_ ready. Do not call 963 /// it simply because, for example, a read succeeded; it should be called 964 /// when a read is observed to block. 965 /// 966 /// This method only clears readiness events that happened before the creation of this guard. 967 /// In other words, if the IO resource becomes ready between the creation of the guard and 968 /// this call to `clear_ready`, then the readiness is not actually cleared. clear_ready(&mut self)969 pub fn clear_ready(&mut self) { 970 if let Some(event) = self.event.take() { 971 self.async_fd.registration.clear_readiness(event); 972 } 973 } 974 975 /// Indicates to tokio that the file descriptor no longer has a specific readiness. 976 /// The internal readiness flag will be cleared, and tokio will wait for the 977 /// next edge-triggered readiness notification from the OS. 978 /// 979 /// This function is useful in combination with the [`AsyncFd::ready`] method when a 980 /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used. 981 /// 982 /// It is critical that this function not be called unless your code 983 /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`. 984 /// Do not call it simply because, for example, a read succeeded; it should be called 985 /// when a read is observed to block. Only clear the specific readiness that is observed to 986 /// block. For example when a read blocks when using a combined interest, 987 /// only clear `Ready::READABLE`. 988 /// 989 /// This method only clears readiness events that happened before the creation of this guard. 990 /// In other words, if the IO resource becomes ready between the creation of the guard and 991 /// this call to `clear_ready`, then the readiness is not actually cleared. 992 /// 993 /// # Examples 994 /// 995 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without 996 /// splitting. 997 /// 998 /// ```no_run 999 /// use std::error::Error; 1000 /// use std::io; 1001 /// use std::io::{Read, Write}; 1002 /// use std::net::TcpStream; 1003 /// use tokio::io::unix::AsyncFd; 1004 /// use tokio::io::{Interest, Ready}; 1005 /// 1006 /// #[tokio::main] 1007 /// async fn main() -> Result<(), Box<dyn Error>> { 1008 /// let stream = TcpStream::connect("127.0.0.1:8080")?; 1009 /// stream.set_nonblocking(true)?; 1010 /// let stream = AsyncFd::new(stream)?; 1011 /// 1012 /// loop { 1013 /// let mut guard = stream 1014 /// .ready(Interest::READABLE | Interest::WRITABLE) 1015 /// .await?; 1016 /// 1017 /// if guard.ready().is_readable() { 1018 /// let mut data = vec![0; 1024]; 1019 /// // Try to read data, this may still fail with `WouldBlock` 1020 /// // if the readiness event is a false positive. 1021 /// match stream.get_ref().read(&mut data) { 1022 /// Ok(n) => { 1023 /// println!("read {} bytes", n); 1024 /// } 1025 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 1026 /// // a read has blocked, but a write might still succeed. 1027 /// // clear only the read readiness. 1028 /// guard.clear_ready_matching(Ready::READABLE); 1029 /// continue; 1030 /// } 1031 /// Err(e) => { 1032 /// return Err(e.into()); 1033 /// } 1034 /// } 1035 /// } 1036 /// 1037 /// if guard.ready().is_writable() { 1038 /// // Try to write data, this may still fail with `WouldBlock` 1039 /// // if the readiness event is a false positive. 1040 /// match stream.get_ref().write(b"hello world") { 1041 /// Ok(n) => { 1042 /// println!("write {} bytes", n); 1043 /// } 1044 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 1045 /// // a write has blocked, but a read might still succeed. 1046 /// // clear only the write readiness. 1047 /// guard.clear_ready_matching(Ready::WRITABLE); 1048 /// continue; 1049 /// } 1050 /// Err(e) => { 1051 /// return Err(e.into()); 1052 /// } 1053 /// } 1054 /// } 1055 /// } 1056 /// } 1057 /// ``` clear_ready_matching(&mut self, ready: Ready)1058 pub fn clear_ready_matching(&mut self, ready: Ready) { 1059 if let Some(mut event) = self.event.take() { 1060 self.async_fd 1061 .registration 1062 .clear_readiness(event.with_ready(ready)); 1063 1064 // the event is no longer ready for the readiness that was just cleared 1065 event.ready = event.ready - ready; 1066 1067 if !event.ready.is_empty() { 1068 self.event = Some(event); 1069 } 1070 } 1071 } 1072 1073 /// This method should be invoked when you intentionally want to keep the 1074 /// ready flag asserted. 1075 /// 1076 /// While this function is itself a no-op, it satisfies the `#[must_use]` 1077 /// constraint on the [`AsyncFdReadyGuard`] type. retain_ready(&mut self)1078 pub fn retain_ready(&mut self) { 1079 // no-op 1080 } 1081 1082 /// Get the [`Ready`] value associated with this guard. 1083 /// 1084 /// This method will return the empty readiness state if 1085 /// [`AsyncFdReadyGuard::clear_ready`] has been called on 1086 /// the guard. 1087 /// 1088 /// [`Ready`]: crate::io::Ready ready(&self) -> Ready1089 pub fn ready(&self) -> Ready { 1090 match &self.event { 1091 Some(event) => event.ready, 1092 None => Ready::EMPTY, 1093 } 1094 } 1095 1096 /// Performs the provided IO operation. 1097 /// 1098 /// If `f` returns a [`WouldBlock`] error, the readiness state associated 1099 /// with this file descriptor is cleared, and the method returns 1100 /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the 1101 /// `AsyncFd` again when this happens. 1102 /// 1103 /// This method helps ensure that the readiness state of the underlying file 1104 /// descriptor remains in sync with the tokio-side readiness state, by 1105 /// clearing the tokio-side state only when a [`WouldBlock`] condition 1106 /// occurs. It is the responsibility of the caller to ensure that `f` 1107 /// returns [`WouldBlock`] only if the file descriptor that originated this 1108 /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to 1109 /// create this `AsyncFdReadyGuard`. 1110 /// 1111 /// # Examples 1112 /// 1113 /// This example sends some bytes to the inner [`std::net::UdpSocket`]. Waiting 1114 /// for write-readiness and retrying when the send operation does block are explicit. 1115 /// This example can be written more succinctly using [`AsyncFd::async_io`]. 1116 /// 1117 /// ```no_run 1118 /// use tokio::io::unix::AsyncFd; 1119 /// 1120 /// use std::io; 1121 /// use std::net::UdpSocket; 1122 /// 1123 /// #[tokio::main] 1124 /// async fn main() -> io::Result<()> { 1125 /// let socket = UdpSocket::bind("0.0.0.0:8080")?; 1126 /// socket.set_nonblocking(true)?; 1127 /// let async_fd = AsyncFd::new(socket)?; 1128 /// 1129 /// let written = loop { 1130 /// let mut guard = async_fd.writable().await?; 1131 /// match guard.try_io(|inner| inner.get_ref().send(&[1, 2])) { 1132 /// Ok(result) => { 1133 /// break result?; 1134 /// } 1135 /// Err(_would_block) => { 1136 /// // try_io already cleared the file descriptor's readiness state 1137 /// continue; 1138 /// } 1139 /// } 1140 /// }; 1141 /// 1142 /// println!("wrote {written} bytes"); 1143 /// 1144 /// Ok(()) 1145 /// } 1146 /// ``` 1147 /// 1148 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock 1149 // Alias for old name in 0.x 1150 #[cfg_attr(docsrs, doc(alias = "with_io"))] try_io<R>( &mut self, f: impl FnOnce(&'a AsyncFd<Inner>) -> io::Result<R>, ) -> Result<io::Result<R>, TryIoError>1151 pub fn try_io<R>( 1152 &mut self, 1153 f: impl FnOnce(&'a AsyncFd<Inner>) -> io::Result<R>, 1154 ) -> Result<io::Result<R>, TryIoError> { 1155 let result = f(self.async_fd); 1156 1157 match result { 1158 Err(err) if err.kind() == io::ErrorKind::WouldBlock => { 1159 self.clear_ready(); 1160 Err(TryIoError(())) 1161 } 1162 result => Ok(result), 1163 } 1164 } 1165 1166 /// Returns a shared reference to the inner [`AsyncFd`]. get_ref(&self) -> &'a AsyncFd<Inner>1167 pub fn get_ref(&self) -> &'a AsyncFd<Inner> { 1168 self.async_fd 1169 } 1170 1171 /// Returns a shared reference to the backing object of the inner [`AsyncFd`]. get_inner(&self) -> &'a Inner1172 pub fn get_inner(&self) -> &'a Inner { 1173 self.get_ref().get_ref() 1174 } 1175 } 1176 1177 impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> { 1178 /// Indicates to tokio that the file descriptor is no longer ready. All 1179 /// internal readiness flags will be cleared, and tokio will wait for the 1180 /// next edge-triggered readiness notification from the OS. 1181 /// 1182 /// This function is commonly used with guards returned by [`AsyncFd::readable_mut`] and 1183 /// [`AsyncFd::writable_mut`]. 1184 /// 1185 /// It is critical that this function not be called unless your code 1186 /// _actually observes_ that the file descriptor is _not_ ready. Do not call 1187 /// it simply because, for example, a read succeeded; it should be called 1188 /// when a read is observed to block. 1189 /// 1190 /// This method only clears readiness events that happened before the creation of this guard. 1191 /// In other words, if the IO resource becomes ready between the creation of the guard and 1192 /// this call to `clear_ready`, then the readiness is not actually cleared. clear_ready(&mut self)1193 pub fn clear_ready(&mut self) { 1194 if let Some(event) = self.event.take() { 1195 self.async_fd.registration.clear_readiness(event); 1196 } 1197 } 1198 1199 /// Indicates to tokio that the file descriptor no longer has a specific readiness. 1200 /// The internal readiness flag will be cleared, and tokio will wait for the 1201 /// next edge-triggered readiness notification from the OS. 1202 /// 1203 /// This function is useful in combination with the [`AsyncFd::ready_mut`] method when a 1204 /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used. 1205 /// 1206 /// It is critical that this function not be called unless your code 1207 /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`. 1208 /// Do not call it simply because, for example, a read succeeded; it should be called 1209 /// when a read is observed to block. Only clear the specific readiness that is observed to 1210 /// block. For example when a read blocks when using a combined interest, 1211 /// only clear `Ready::READABLE`. 1212 /// 1213 /// This method only clears readiness events that happened before the creation of this guard. 1214 /// In other words, if the IO resource becomes ready between the creation of the guard and 1215 /// this call to `clear_ready`, then the readiness is not actually cleared. 1216 /// 1217 /// # Examples 1218 /// 1219 /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without 1220 /// splitting. 1221 /// 1222 /// ```no_run 1223 /// use std::error::Error; 1224 /// use std::io; 1225 /// use std::io::{Read, Write}; 1226 /// use std::net::TcpStream; 1227 /// use tokio::io::unix::AsyncFd; 1228 /// use tokio::io::{Interest, Ready}; 1229 /// 1230 /// #[tokio::main] 1231 /// async fn main() -> Result<(), Box<dyn Error>> { 1232 /// let stream = TcpStream::connect("127.0.0.1:8080")?; 1233 /// stream.set_nonblocking(true)?; 1234 /// let mut stream = AsyncFd::new(stream)?; 1235 /// 1236 /// loop { 1237 /// let mut guard = stream 1238 /// .ready_mut(Interest::READABLE | Interest::WRITABLE) 1239 /// .await?; 1240 /// 1241 /// if guard.ready().is_readable() { 1242 /// let mut data = vec![0; 1024]; 1243 /// // Try to read data, this may still fail with `WouldBlock` 1244 /// // if the readiness event is a false positive. 1245 /// match guard.get_inner_mut().read(&mut data) { 1246 /// Ok(n) => { 1247 /// println!("read {} bytes", n); 1248 /// } 1249 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 1250 /// // a read has blocked, but a write might still succeed. 1251 /// // clear only the read readiness. 1252 /// guard.clear_ready_matching(Ready::READABLE); 1253 /// continue; 1254 /// } 1255 /// Err(e) => { 1256 /// return Err(e.into()); 1257 /// } 1258 /// } 1259 /// } 1260 /// 1261 /// if guard.ready().is_writable() { 1262 /// // Try to write data, this may still fail with `WouldBlock` 1263 /// // if the readiness event is a false positive. 1264 /// match guard.get_inner_mut().write(b"hello world") { 1265 /// Ok(n) => { 1266 /// println!("write {} bytes", n); 1267 /// } 1268 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 1269 /// // a write has blocked, but a read might still succeed. 1270 /// // clear only the write readiness. 1271 /// guard.clear_ready_matching(Ready::WRITABLE); 1272 /// continue; 1273 /// } 1274 /// Err(e) => { 1275 /// return Err(e.into()); 1276 /// } 1277 /// } 1278 /// } 1279 /// } 1280 /// } 1281 /// ``` clear_ready_matching(&mut self, ready: Ready)1282 pub fn clear_ready_matching(&mut self, ready: Ready) { 1283 if let Some(mut event) = self.event.take() { 1284 self.async_fd 1285 .registration 1286 .clear_readiness(event.with_ready(ready)); 1287 1288 // the event is no longer ready for the readiness that was just cleared 1289 event.ready = event.ready - ready; 1290 1291 if !event.ready.is_empty() { 1292 self.event = Some(event); 1293 } 1294 } 1295 } 1296 1297 /// This method should be invoked when you intentionally want to keep the 1298 /// ready flag asserted. 1299 /// 1300 /// While this function is itself a no-op, it satisfies the `#[must_use]` 1301 /// constraint on the [`AsyncFdReadyGuard`] type. retain_ready(&mut self)1302 pub fn retain_ready(&mut self) { 1303 // no-op 1304 } 1305 1306 /// Get the [`Ready`] value associated with this guard. 1307 /// 1308 /// This method will return the empty readiness state if 1309 /// [`AsyncFdReadyGuard::clear_ready`] has been called on 1310 /// the guard. 1311 /// 1312 /// [`Ready`]: super::Ready ready(&self) -> Ready1313 pub fn ready(&self) -> Ready { 1314 match &self.event { 1315 Some(event) => event.ready, 1316 None => Ready::EMPTY, 1317 } 1318 } 1319 1320 /// Performs the provided IO operation. 1321 /// 1322 /// If `f` returns a [`WouldBlock`] error, the readiness state associated 1323 /// with this file descriptor is cleared, and the method returns 1324 /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the 1325 /// `AsyncFd` again when this happens. 1326 /// 1327 /// This method helps ensure that the readiness state of the underlying file 1328 /// descriptor remains in sync with the tokio-side readiness state, by 1329 /// clearing the tokio-side state only when a [`WouldBlock`] condition 1330 /// occurs. It is the responsibility of the caller to ensure that `f` 1331 /// returns [`WouldBlock`] only if the file descriptor that originated this 1332 /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to 1333 /// create this `AsyncFdReadyGuard`. 1334 /// 1335 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock try_io<R>( &mut self, f: impl FnOnce(&mut AsyncFd<Inner>) -> io::Result<R>, ) -> Result<io::Result<R>, TryIoError>1336 pub fn try_io<R>( 1337 &mut self, 1338 f: impl FnOnce(&mut AsyncFd<Inner>) -> io::Result<R>, 1339 ) -> Result<io::Result<R>, TryIoError> { 1340 let result = f(self.async_fd); 1341 1342 match result { 1343 Err(err) if err.kind() == io::ErrorKind::WouldBlock => { 1344 self.clear_ready(); 1345 Err(TryIoError(())) 1346 } 1347 result => Ok(result), 1348 } 1349 } 1350 1351 /// Returns a shared reference to the inner [`AsyncFd`]. get_ref(&self) -> &AsyncFd<Inner>1352 pub fn get_ref(&self) -> &AsyncFd<Inner> { 1353 self.async_fd 1354 } 1355 1356 /// Returns a mutable reference to the inner [`AsyncFd`]. get_mut(&mut self) -> &mut AsyncFd<Inner>1357 pub fn get_mut(&mut self) -> &mut AsyncFd<Inner> { 1358 self.async_fd 1359 } 1360 1361 /// Returns a shared reference to the backing object of the inner [`AsyncFd`]. get_inner(&self) -> &Inner1362 pub fn get_inner(&self) -> &Inner { 1363 self.get_ref().get_ref() 1364 } 1365 1366 /// Returns a mutable reference to the backing object of the inner [`AsyncFd`]. get_inner_mut(&mut self) -> &mut Inner1367 pub fn get_inner_mut(&mut self) -> &mut Inner { 1368 self.get_mut().get_mut() 1369 } 1370 } 1371 1372 impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result1373 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 1374 f.debug_struct("ReadyGuard") 1375 .field("async_fd", &self.async_fd) 1376 .finish() 1377 } 1378 } 1379 1380 impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyMutGuard<'a, T> { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result1381 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 1382 f.debug_struct("MutReadyGuard") 1383 .field("async_fd", &self.async_fd) 1384 .finish() 1385 } 1386 } 1387 1388 /// The error type returned by [`try_io`]. 1389 /// 1390 /// This error indicates that the IO resource returned a [`WouldBlock`] error. 1391 /// 1392 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock 1393 /// [`try_io`]: method@AsyncFdReadyGuard::try_io 1394 #[derive(Debug)] 1395 pub struct TryIoError(()); 1396 1397 /// Error returned by [`try_new`] or [`try_with_interest`]. 1398 /// 1399 /// [`try_new`]: AsyncFd::try_new 1400 /// [`try_with_interest`]: AsyncFd::try_with_interest 1401 pub struct AsyncFdTryNewError<T> { 1402 inner: T, 1403 cause: io::Error, 1404 } 1405 1406 impl<T> AsyncFdTryNewError<T> { 1407 /// Returns the original object passed to [`try_new`] or [`try_with_interest`] 1408 /// alongside the error that caused these functions to fail. 1409 /// 1410 /// [`try_new`]: AsyncFd::try_new 1411 /// [`try_with_interest`]: AsyncFd::try_with_interest into_parts(self) -> (T, io::Error)1412 pub fn into_parts(self) -> (T, io::Error) { 1413 (self.inner, self.cause) 1414 } 1415 } 1416 1417 impl<T> fmt::Display for AsyncFdTryNewError<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1418 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1419 fmt::Display::fmt(&self.cause, f) 1420 } 1421 } 1422 1423 impl<T> fmt::Debug for AsyncFdTryNewError<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1424 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1425 fmt::Debug::fmt(&self.cause, f) 1426 } 1427 } 1428 1429 impl<T> Error for AsyncFdTryNewError<T> { source(&self) -> Option<&(dyn Error + 'static)>1430 fn source(&self) -> Option<&(dyn Error + 'static)> { 1431 Some(&self.cause) 1432 } 1433 } 1434 1435 impl<T> From<AsyncFdTryNewError<T>> for io::Error { from(value: AsyncFdTryNewError<T>) -> Self1436 fn from(value: AsyncFdTryNewError<T>) -> Self { 1437 value.cause 1438 } 1439 } 1440