1 use crate::io::{Interest, Ready};
2 use crate::runtime::io::{ReadyEvent, Registration};
3 use crate::runtime::scheduler;
4 
5 use mio::unix::SourceFd;
6 use std::error::Error;
7 use std::fmt;
8 use std::io;
9 use std::os::unix::io::{AsRawFd, RawFd};
10 use std::task::{ready, Context, Poll};
11 
12 /// Associates an IO object backed by a Unix file descriptor with the tokio
13 /// reactor, allowing for readiness to be polled. The file descriptor must be of
14 /// a type that can be used with the OS polling facilities (ie, `poll`, `epoll`,
15 /// `kqueue`, etc), such as a network socket or pipe, and the file descriptor
16 /// must have the nonblocking mode set to true.
17 ///
18 /// Creating an [`AsyncFd`] registers the file descriptor with the current tokio
19 /// Reactor, allowing you to directly await the file descriptor being readable
20 /// or writable. Once registered, the file descriptor remains registered until
21 /// the [`AsyncFd`] is dropped.
22 ///
23 /// The [`AsyncFd`] takes ownership of an arbitrary object to represent the IO
24 /// object. It is intended that the inner object will handle closing the file
25 /// descriptor when it is dropped, avoiding resource leaks and ensuring that the
26 /// [`AsyncFd`] can clean up the registration before closing the file descriptor.
27 /// The [`AsyncFd::into_inner`] function can be used to extract the inner object
28 /// to retake control from the tokio IO reactor. The [`OwnedFd`] type is often
29 /// used as the inner object, as it is the simplest type that closes the fd on
30 /// drop.
31 ///
32 /// The inner object is required to implement [`AsRawFd`]. This file descriptor
33 /// must not change while [`AsyncFd`] owns the inner object, i.e. the
34 /// [`AsRawFd::as_raw_fd`] method on the inner type must always return the same
35 /// file descriptor when called multiple times. Failure to uphold this results
36 /// in unspecified behavior in the IO driver, which may include breaking
37 /// notifications for other sockets/etc.
38 ///
39 /// Polling for readiness is done by calling the async functions [`readable`]
40 /// and [`writable`]. These functions complete when the associated readiness
41 /// condition is observed. Any number of tasks can query the same `AsyncFd` in
42 /// parallel, on the same or different conditions.
43 ///
44 /// On some platforms, the readiness detecting mechanism relies on
45 /// edge-triggered notifications. This means that the OS will only notify Tokio
46 /// when the file descriptor transitions from not-ready to ready. For this to
47 /// work you should first try to read or write and only poll for readiness
48 /// if that fails with an error of [`std::io::ErrorKind::WouldBlock`].
49 ///
50 /// Tokio internally tracks when it has received a ready notification, and when
51 /// readiness checking functions like [`readable`] and [`writable`] are called,
52 /// if the readiness flag is set, these async functions will complete
53 /// immediately. This however does mean that it is critical to ensure that this
54 /// ready flag is cleared when (and only when) the file descriptor ceases to be
55 /// ready. The [`AsyncFdReadyGuard`] returned from readiness checking functions
56 /// serves this function; after calling a readiness-checking async function,
57 /// you must use this [`AsyncFdReadyGuard`] to signal to tokio whether the file
58 /// descriptor is no longer in a ready state.
59 ///
60 /// ## Use with to a poll-based API
61 ///
62 /// In some cases it may be desirable to use `AsyncFd` from APIs similar to
63 /// [`TcpStream::poll_read_ready`]. The [`AsyncFd::poll_read_ready`] and
64 /// [`AsyncFd::poll_write_ready`] functions are provided for this purpose.
65 /// Because these functions don't create a future to hold their state, they have
66 /// the limitation that only one task can wait on each direction (read or write)
67 /// at a time.
68 ///
69 /// # Examples
70 ///
71 /// This example shows how to turn [`std::net::TcpStream`] asynchronous using
72 /// `AsyncFd`.  It implements the read/write operations both as an `async fn`
73 /// and using the IO traits [`AsyncRead`] and [`AsyncWrite`].
74 ///
75 /// ```no_run
76 /// use std::io::{self, Read, Write};
77 /// use std::net::TcpStream;
78 /// use std::pin::Pin;
79 /// use std::task::{ready, Context, Poll};
80 /// use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
81 /// use tokio::io::unix::AsyncFd;
82 ///
83 /// pub struct AsyncTcpStream {
84 ///     inner: AsyncFd<TcpStream>,
85 /// }
86 ///
87 /// impl AsyncTcpStream {
88 ///     pub fn new(tcp: TcpStream) -> io::Result<Self> {
89 ///         tcp.set_nonblocking(true)?;
90 ///         Ok(Self {
91 ///             inner: AsyncFd::new(tcp)?,
92 ///         })
93 ///     }
94 ///
95 ///     pub async fn read(&self, out: &mut [u8]) -> io::Result<usize> {
96 ///         loop {
97 ///             let mut guard = self.inner.readable().await?;
98 ///
99 ///             match guard.try_io(|inner| inner.get_ref().read(out)) {
100 ///                 Ok(result) => return result,
101 ///                 Err(_would_block) => continue,
102 ///             }
103 ///         }
104 ///     }
105 ///
106 ///     pub async fn write(&self, buf: &[u8]) -> io::Result<usize> {
107 ///         loop {
108 ///             let mut guard = self.inner.writable().await?;
109 ///
110 ///             match guard.try_io(|inner| inner.get_ref().write(buf)) {
111 ///                 Ok(result) => return result,
112 ///                 Err(_would_block) => continue,
113 ///             }
114 ///         }
115 ///     }
116 /// }
117 ///
118 /// impl AsyncRead for AsyncTcpStream {
119 ///     fn poll_read(
120 ///         self: Pin<&mut Self>,
121 ///         cx: &mut Context<'_>,
122 ///         buf: &mut ReadBuf<'_>
123 ///     ) -> Poll<io::Result<()>> {
124 ///         loop {
125 ///             let mut guard = ready!(self.inner.poll_read_ready(cx))?;
126 ///
127 ///             let unfilled = buf.initialize_unfilled();
128 ///             match guard.try_io(|inner| inner.get_ref().read(unfilled)) {
129 ///                 Ok(Ok(len)) => {
130 ///                     buf.advance(len);
131 ///                     return Poll::Ready(Ok(()));
132 ///                 },
133 ///                 Ok(Err(err)) => return Poll::Ready(Err(err)),
134 ///                 Err(_would_block) => continue,
135 ///             }
136 ///         }
137 ///     }
138 /// }
139 ///
140 /// impl AsyncWrite for AsyncTcpStream {
141 ///     fn poll_write(
142 ///         self: Pin<&mut Self>,
143 ///         cx: &mut Context<'_>,
144 ///         buf: &[u8]
145 ///     ) -> Poll<io::Result<usize>> {
146 ///         loop {
147 ///             let mut guard = ready!(self.inner.poll_write_ready(cx))?;
148 ///
149 ///             match guard.try_io(|inner| inner.get_ref().write(buf)) {
150 ///                 Ok(result) => return Poll::Ready(result),
151 ///                 Err(_would_block) => continue,
152 ///             }
153 ///         }
154 ///     }
155 ///
156 ///     fn poll_flush(
157 ///         self: Pin<&mut Self>,
158 ///         cx: &mut Context<'_>,
159 ///     ) -> Poll<io::Result<()>> {
160 ///         // tcp flush is a no-op
161 ///         Poll::Ready(Ok(()))
162 ///     }
163 ///
164 ///     fn poll_shutdown(
165 ///         self: Pin<&mut Self>,
166 ///         cx: &mut Context<'_>,
167 ///     ) -> Poll<io::Result<()>> {
168 ///         self.inner.get_ref().shutdown(std::net::Shutdown::Write)?;
169 ///         Poll::Ready(Ok(()))
170 ///     }
171 /// }
172 /// ```
173 ///
174 /// [`readable`]: method@Self::readable
175 /// [`writable`]: method@Self::writable
176 /// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
177 /// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream
178 /// [`AsyncRead`]: trait@crate::io::AsyncRead
179 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite
180 /// [`OwnedFd`]: struct@std::os::fd::OwnedFd
181 pub struct AsyncFd<T: AsRawFd> {
182     registration: Registration,
183     // The inner value is always present. the Option is required for `drop` and `into_inner`.
184     // In all other methods `unwrap` is valid, and will never panic.
185     inner: Option<T>,
186 }
187 
188 /// Represents an IO-ready event detected on a particular file descriptor that
189 /// has not yet been acknowledged. This is a `must_use` structure to help ensure
190 /// that you do not forget to explicitly clear (or not clear) the event.
191 ///
192 /// This type exposes an immutable reference to the underlying IO object.
193 #[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
194 pub struct AsyncFdReadyGuard<'a, T: AsRawFd> {
195     async_fd: &'a AsyncFd<T>,
196     event: Option<ReadyEvent>,
197 }
198 
199 /// Represents an IO-ready event detected on a particular file descriptor that
200 /// has not yet been acknowledged. This is a `must_use` structure to help ensure
201 /// that you do not forget to explicitly clear (or not clear) the event.
202 ///
203 /// This type exposes a mutable reference to the underlying IO object.
204 #[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
205 pub struct AsyncFdReadyMutGuard<'a, T: AsRawFd> {
206     async_fd: &'a mut AsyncFd<T>,
207     event: Option<ReadyEvent>,
208 }
209 
210 impl<T: AsRawFd> AsyncFd<T> {
211     /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
212     /// implementing [`AsRawFd`]. The backing file descriptor is cached at the
213     /// time of creation.
214     ///
215     /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more
216     /// control, use [`AsyncFd::with_interest`].
217     ///
218     /// This method must be called in the context of a tokio runtime.
219     ///
220     /// # Panics
221     ///
222     /// This function panics if there is no current reactor set, or if the `rt`
223     /// feature flag is not enabled.
224     #[inline]
225     #[track_caller]
new(inner: T) -> io::Result<Self> where T: AsRawFd,226     pub fn new(inner: T) -> io::Result<Self>
227     where
228         T: AsRawFd,
229     {
230         Self::with_interest(inner, Interest::READABLE | Interest::WRITABLE)
231     }
232 
233     /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
234     /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing
235     /// file descriptor is cached at the time of creation.
236     ///
237     /// # Panics
238     ///
239     /// This function panics if there is no current reactor set, or if the `rt`
240     /// feature flag is not enabled.
241     #[inline]
242     #[track_caller]
with_interest(inner: T, interest: Interest) -> io::Result<Self> where T: AsRawFd,243     pub fn with_interest(inner: T, interest: Interest) -> io::Result<Self>
244     where
245         T: AsRawFd,
246     {
247         Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
248     }
249 
250     #[track_caller]
new_with_handle_and_interest( inner: T, handle: scheduler::Handle, interest: Interest, ) -> io::Result<Self>251     pub(crate) fn new_with_handle_and_interest(
252         inner: T,
253         handle: scheduler::Handle,
254         interest: Interest,
255     ) -> io::Result<Self> {
256         Self::try_new_with_handle_and_interest(inner, handle, interest).map_err(Into::into)
257     }
258 
259     /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
260     /// implementing [`AsRawFd`]. The backing file descriptor is cached at the
261     /// time of creation.
262     ///
263     /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more
264     /// control, use [`AsyncFd::try_with_interest`].
265     ///
266     /// This method must be called in the context of a tokio runtime.
267     ///
268     /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
269     /// passed to this function.
270     ///
271     /// # Panics
272     ///
273     /// This function panics if there is no current reactor set, or if the `rt`
274     /// feature flag is not enabled.
275     #[inline]
276     #[track_caller]
try_new(inner: T) -> Result<Self, AsyncFdTryNewError<T>> where T: AsRawFd,277     pub fn try_new(inner: T) -> Result<Self, AsyncFdTryNewError<T>>
278     where
279         T: AsRawFd,
280     {
281         Self::try_with_interest(inner, Interest::READABLE | Interest::WRITABLE)
282     }
283 
284     /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
285     /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing
286     /// file descriptor is cached at the time of creation.
287     ///
288     /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
289     /// passed to this function.
290     ///
291     /// # Panics
292     ///
293     /// This function panics if there is no current reactor set, or if the `rt`
294     /// feature flag is not enabled.
295     #[inline]
296     #[track_caller]
try_with_interest(inner: T, interest: Interest) -> Result<Self, AsyncFdTryNewError<T>> where T: AsRawFd,297     pub fn try_with_interest(inner: T, interest: Interest) -> Result<Self, AsyncFdTryNewError<T>>
298     where
299         T: AsRawFd,
300     {
301         Self::try_new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
302     }
303 
304     #[track_caller]
try_new_with_handle_and_interest( inner: T, handle: scheduler::Handle, interest: Interest, ) -> Result<Self, AsyncFdTryNewError<T>>305     pub(crate) fn try_new_with_handle_and_interest(
306         inner: T,
307         handle: scheduler::Handle,
308         interest: Interest,
309     ) -> Result<Self, AsyncFdTryNewError<T>> {
310         let fd = inner.as_raw_fd();
311 
312         match Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle) {
313             Ok(registration) => Ok(AsyncFd {
314                 registration,
315                 inner: Some(inner),
316             }),
317             Err(cause) => Err(AsyncFdTryNewError { inner, cause }),
318         }
319     }
320 
321     /// Returns a shared reference to the backing object of this [`AsyncFd`].
322     #[inline]
get_ref(&self) -> &T323     pub fn get_ref(&self) -> &T {
324         self.inner.as_ref().unwrap()
325     }
326 
327     /// Returns a mutable reference to the backing object of this [`AsyncFd`].
328     #[inline]
get_mut(&mut self) -> &mut T329     pub fn get_mut(&mut self) -> &mut T {
330         self.inner.as_mut().unwrap()
331     }
332 
take_inner(&mut self) -> Option<T>333     fn take_inner(&mut self) -> Option<T> {
334         let inner = self.inner.take()?;
335         let fd = inner.as_raw_fd();
336 
337         let _ = self.registration.deregister(&mut SourceFd(&fd));
338 
339         Some(inner)
340     }
341 
342     /// Deregisters this file descriptor and returns ownership of the backing
343     /// object.
into_inner(mut self) -> T344     pub fn into_inner(mut self) -> T {
345         self.take_inner().unwrap()
346     }
347 
348     /// Polls for read readiness.
349     ///
350     /// If the file descriptor is not currently ready for reading, this method
351     /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
352     /// file descriptor becomes ready for reading, [`Waker::wake`] will be called.
353     ///
354     /// Note that on multiple calls to [`poll_read_ready`] or
355     /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the
356     /// most recent call is scheduled to receive a wakeup. (However,
357     /// [`poll_write_ready`] retains a second, independent waker).
358     ///
359     /// This method is intended for cases where creating and pinning a future
360     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
361     /// preferred, as this supports polling from multiple tasks at once.
362     ///
363     /// This method takes `&self`, so it is possible to call this method
364     /// concurrently with other methods on this struct. This method only
365     /// provides shared access to the inner IO resource when handling the
366     /// [`AsyncFdReadyGuard`].
367     ///
368     /// [`poll_read_ready`]: method@Self::poll_read_ready
369     /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut
370     /// [`poll_write_ready`]: method@Self::poll_write_ready
371     /// [`readable`]: method@Self::readable
372     /// [`Context`]: struct@std::task::Context
373     /// [`Waker`]: struct@std::task::Waker
374     /// [`Waker::wake`]: method@std::task::Waker::wake
poll_read_ready<'a>( &'a self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>>375     pub fn poll_read_ready<'a>(
376         &'a self,
377         cx: &mut Context<'_>,
378     ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
379         let event = ready!(self.registration.poll_read_ready(cx))?;
380 
381         Poll::Ready(Ok(AsyncFdReadyGuard {
382             async_fd: self,
383             event: Some(event),
384         }))
385     }
386 
387     /// Polls for read readiness.
388     ///
389     /// If the file descriptor is not currently ready for reading, this method
390     /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
391     /// file descriptor becomes ready for reading, [`Waker::wake`] will be called.
392     ///
393     /// Note that on multiple calls to [`poll_read_ready`] or
394     /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the
395     /// most recent call is scheduled to receive a wakeup. (However,
396     /// [`poll_write_ready`] retains a second, independent waker).
397     ///
398     /// This method is intended for cases where creating and pinning a future
399     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
400     /// preferred, as this supports polling from multiple tasks at once.
401     ///
402     /// This method takes `&mut self`, so it is possible to access the inner IO
403     /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
404     ///
405     /// [`poll_read_ready`]: method@Self::poll_read_ready
406     /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut
407     /// [`poll_write_ready`]: method@Self::poll_write_ready
408     /// [`readable`]: method@Self::readable
409     /// [`Context`]: struct@std::task::Context
410     /// [`Waker`]: struct@std::task::Waker
411     /// [`Waker::wake`]: method@std::task::Waker::wake
poll_read_ready_mut<'a>( &'a mut self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>>412     pub fn poll_read_ready_mut<'a>(
413         &'a mut self,
414         cx: &mut Context<'_>,
415     ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> {
416         let event = ready!(self.registration.poll_read_ready(cx))?;
417 
418         Poll::Ready(Ok(AsyncFdReadyMutGuard {
419             async_fd: self,
420             event: Some(event),
421         }))
422     }
423 
424     /// Polls for write readiness.
425     ///
426     /// If the file descriptor is not currently ready for writing, this method
427     /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
428     /// file descriptor becomes ready for writing, [`Waker::wake`] will be called.
429     ///
430     /// Note that on multiple calls to [`poll_write_ready`] or
431     /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the
432     /// most recent call is scheduled to receive a wakeup. (However,
433     /// [`poll_read_ready`] retains a second, independent waker).
434     ///
435     /// This method is intended for cases where creating and pinning a future
436     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
437     /// preferred, as this supports polling from multiple tasks at once.
438     ///
439     /// This method takes `&self`, so it is possible to call this method
440     /// concurrently with other methods on this struct. This method only
441     /// provides shared access to the inner IO resource when handling the
442     /// [`AsyncFdReadyGuard`].
443     ///
444     /// [`poll_read_ready`]: method@Self::poll_read_ready
445     /// [`poll_write_ready`]: method@Self::poll_write_ready
446     /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut
447     /// [`writable`]: method@Self::readable
448     /// [`Context`]: struct@std::task::Context
449     /// [`Waker`]: struct@std::task::Waker
450     /// [`Waker::wake`]: method@std::task::Waker::wake
poll_write_ready<'a>( &'a self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>>451     pub fn poll_write_ready<'a>(
452         &'a self,
453         cx: &mut Context<'_>,
454     ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
455         let event = ready!(self.registration.poll_write_ready(cx))?;
456 
457         Poll::Ready(Ok(AsyncFdReadyGuard {
458             async_fd: self,
459             event: Some(event),
460         }))
461     }
462 
463     /// Polls for write readiness.
464     ///
465     /// If the file descriptor is not currently ready for writing, this method
466     /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
467     /// file descriptor becomes ready for writing, [`Waker::wake`] will be called.
468     ///
469     /// Note that on multiple calls to [`poll_write_ready`] or
470     /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the
471     /// most recent call is scheduled to receive a wakeup. (However,
472     /// [`poll_read_ready`] retains a second, independent waker).
473     ///
474     /// This method is intended for cases where creating and pinning a future
475     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
476     /// preferred, as this supports polling from multiple tasks at once.
477     ///
478     /// This method takes `&mut self`, so it is possible to access the inner IO
479     /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
480     ///
481     /// [`poll_read_ready`]: method@Self::poll_read_ready
482     /// [`poll_write_ready`]: method@Self::poll_write_ready
483     /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut
484     /// [`writable`]: method@Self::readable
485     /// [`Context`]: struct@std::task::Context
486     /// [`Waker`]: struct@std::task::Waker
487     /// [`Waker::wake`]: method@std::task::Waker::wake
poll_write_ready_mut<'a>( &'a mut self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>>488     pub fn poll_write_ready_mut<'a>(
489         &'a mut self,
490         cx: &mut Context<'_>,
491     ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> {
492         let event = ready!(self.registration.poll_write_ready(cx))?;
493 
494         Poll::Ready(Ok(AsyncFdReadyMutGuard {
495             async_fd: self,
496             event: Some(event),
497         }))
498     }
499 
500     /// Waits for any of the requested ready states, returning a
501     /// [`AsyncFdReadyGuard`] that must be dropped to resume
502     /// polling for the requested ready states.
503     ///
504     /// The function may complete without the file descriptor being ready. This is a
505     /// false-positive and attempting an operation will return with
506     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
507     /// [`Ready`] set, so you should always check the returned value and possibly
508     /// wait again if the requested states are not set.
509     ///
510     /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared.
511     /// When a combined interest is used, it is important to clear only the readiness
512     /// that is actually observed to block. For instance when the combined
513     /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only
514     /// read readiness should be cleared using the [`AsyncFdReadyGuard::clear_ready_matching`] method:
515     /// `guard.clear_ready_matching(Ready::READABLE)`.
516     /// Also clearing the write readiness in this case would be incorrect. The [`AsyncFdReadyGuard::clear_ready`]
517     /// method clears all readiness flags.
518     ///
519     /// This method takes `&self`, so it is possible to call this method
520     /// concurrently with other methods on this struct. This method only
521     /// provides shared access to the inner IO resource when handling the
522     /// [`AsyncFdReadyGuard`].
523     ///
524     /// # Examples
525     ///
526     /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
527     /// splitting.
528     ///
529     /// ```no_run
530     /// use std::error::Error;
531     /// use std::io;
532     /// use std::io::{Read, Write};
533     /// use std::net::TcpStream;
534     /// use tokio::io::unix::AsyncFd;
535     /// use tokio::io::{Interest, Ready};
536     ///
537     /// #[tokio::main]
538     /// async fn main() -> Result<(), Box<dyn Error>> {
539     ///     let stream = TcpStream::connect("127.0.0.1:8080")?;
540     ///     stream.set_nonblocking(true)?;
541     ///     let stream = AsyncFd::new(stream)?;
542     ///
543     ///     loop {
544     ///         let mut guard = stream
545     ///             .ready(Interest::READABLE | Interest::WRITABLE)
546     ///             .await?;
547     ///
548     ///         if guard.ready().is_readable() {
549     ///             let mut data = vec![0; 1024];
550     ///             // Try to read data, this may still fail with `WouldBlock`
551     ///             // if the readiness event is a false positive.
552     ///             match stream.get_ref().read(&mut data) {
553     ///                 Ok(n) => {
554     ///                     println!("read {} bytes", n);
555     ///                 }
556     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
557     ///                     // a read has blocked, but a write might still succeed.
558     ///                     // clear only the read readiness.
559     ///                     guard.clear_ready_matching(Ready::READABLE);
560     ///                     continue;
561     ///                 }
562     ///                 Err(e) => {
563     ///                     return Err(e.into());
564     ///                 }
565     ///             }
566     ///         }
567     ///
568     ///         if guard.ready().is_writable() {
569     ///             // Try to write data, this may still fail with `WouldBlock`
570     ///             // if the readiness event is a false positive.
571     ///             match stream.get_ref().write(b"hello world") {
572     ///                 Ok(n) => {
573     ///                     println!("write {} bytes", n);
574     ///                 }
575     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
576     ///                     // a write has blocked, but a read might still succeed.
577     ///                     // clear only the write readiness.
578     ///                     guard.clear_ready_matching(Ready::WRITABLE);
579     ///                     continue;
580     ///                 }
581     ///                 Err(e) => {
582     ///                     return Err(e.into());
583     ///                 }
584     ///             }
585     ///         }
586     ///     }
587     /// }
588     /// ```
ready(&self, interest: Interest) -> io::Result<AsyncFdReadyGuard<'_, T>>589     pub async fn ready(&self, interest: Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> {
590         let event = self.registration.readiness(interest).await?;
591 
592         Ok(AsyncFdReadyGuard {
593             async_fd: self,
594             event: Some(event),
595         })
596     }
597 
598     /// Waits for any of the requested ready states, returning a
599     /// [`AsyncFdReadyMutGuard`] that must be dropped to resume
600     /// polling for the requested ready states.
601     ///
602     /// The function may complete without the file descriptor being ready. This is a
603     /// false-positive and attempting an operation will return with
604     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
605     /// [`Ready`] set, so you should always check the returned value and possibly
606     /// wait again if the requested states are not set.
607     ///
608     /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared.
609     /// When a combined interest is used, it is important to clear only the readiness
610     /// that is actually observed to block. For instance when the combined
611     /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only
612     /// read readiness should be cleared using the [`AsyncFdReadyMutGuard::clear_ready_matching`] method:
613     /// `guard.clear_ready_matching(Ready::READABLE)`.
614     /// Also clearing the write readiness in this case would be incorrect.
615     /// The [`AsyncFdReadyMutGuard::clear_ready`] method clears all readiness flags.
616     ///
617     /// This method takes `&mut self`, so it is possible to access the inner IO
618     /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
619     ///
620     /// # Examples
621     ///
622     /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
623     /// splitting.
624     ///
625     /// ```no_run
626     /// use std::error::Error;
627     /// use std::io;
628     /// use std::io::{Read, Write};
629     /// use std::net::TcpStream;
630     /// use tokio::io::unix::AsyncFd;
631     /// use tokio::io::{Interest, Ready};
632     ///
633     /// #[tokio::main]
634     /// async fn main() -> Result<(), Box<dyn Error>> {
635     ///     let stream = TcpStream::connect("127.0.0.1:8080")?;
636     ///     stream.set_nonblocking(true)?;
637     ///     let mut stream = AsyncFd::new(stream)?;
638     ///
639     ///     loop {
640     ///         let mut guard = stream
641     ///             .ready_mut(Interest::READABLE | Interest::WRITABLE)
642     ///             .await?;
643     ///
644     ///         if guard.ready().is_readable() {
645     ///             let mut data = vec![0; 1024];
646     ///             // Try to read data, this may still fail with `WouldBlock`
647     ///             // if the readiness event is a false positive.
648     ///             match guard.get_inner_mut().read(&mut data) {
649     ///                 Ok(n) => {
650     ///                     println!("read {} bytes", n);
651     ///                 }
652     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
653     ///                     // a read has blocked, but a write might still succeed.
654     ///                     // clear only the read readiness.
655     ///                     guard.clear_ready_matching(Ready::READABLE);
656     ///                     continue;
657     ///                 }
658     ///                 Err(e) => {
659     ///                     return Err(e.into());
660     ///                 }
661     ///             }
662     ///         }
663     ///
664     ///         if guard.ready().is_writable() {
665     ///             // Try to write data, this may still fail with `WouldBlock`
666     ///             // if the readiness event is a false positive.
667     ///             match guard.get_inner_mut().write(b"hello world") {
668     ///                 Ok(n) => {
669     ///                     println!("write {} bytes", n);
670     ///                 }
671     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
672     ///                     // a write has blocked, but a read might still succeed.
673     ///                     // clear only the write readiness.
674     ///                     guard.clear_ready_matching(Ready::WRITABLE);
675     ///                     continue;
676     ///                 }
677     ///                 Err(e) => {
678     ///                     return Err(e.into());
679     ///                 }
680     ///             }
681     ///         }
682     ///     }
683     /// }
684     /// ```
ready_mut( &mut self, interest: Interest, ) -> io::Result<AsyncFdReadyMutGuard<'_, T>>685     pub async fn ready_mut(
686         &mut self,
687         interest: Interest,
688     ) -> io::Result<AsyncFdReadyMutGuard<'_, T>> {
689         let event = self.registration.readiness(interest).await?;
690 
691         Ok(AsyncFdReadyMutGuard {
692             async_fd: self,
693             event: Some(event),
694         })
695     }
696 
697     /// Waits for the file descriptor to become readable, returning a
698     /// [`AsyncFdReadyGuard`] that must be dropped to resume read-readiness
699     /// polling.
700     ///
701     /// This method takes `&self`, so it is possible to call this method
702     /// concurrently with other methods on this struct. This method only
703     /// provides shared access to the inner IO resource when handling the
704     /// [`AsyncFdReadyGuard`].
705     ///
706     /// # Cancel safety
707     ///
708     /// This method is cancel safe. Once a readiness event occurs, the method
709     /// will continue to return immediately until the readiness event is
710     /// consumed by an attempt to read or write that fails with `WouldBlock` or
711     /// `Poll::Pending`.
712     #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
readable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>>713     pub async fn readable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
714         self.ready(Interest::READABLE).await
715     }
716 
717     /// Waits for the file descriptor to become readable, returning a
718     /// [`AsyncFdReadyMutGuard`] that must be dropped to resume read-readiness
719     /// polling.
720     ///
721     /// This method takes `&mut self`, so it is possible to access the inner IO
722     /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
723     ///
724     /// # Cancel safety
725     ///
726     /// This method is cancel safe. Once a readiness event occurs, the method
727     /// will continue to return immediately until the readiness event is
728     /// consumed by an attempt to read or write that fails with `WouldBlock` or
729     /// `Poll::Pending`.
730     #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
readable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>>731     pub async fn readable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
732         self.ready_mut(Interest::READABLE).await
733     }
734 
735     /// Waits for the file descriptor to become writable, returning a
736     /// [`AsyncFdReadyGuard`] that must be dropped to resume write-readiness
737     /// polling.
738     ///
739     /// This method takes `&self`, so it is possible to call this method
740     /// concurrently with other methods on this struct. This method only
741     /// provides shared access to the inner IO resource when handling the
742     /// [`AsyncFdReadyGuard`].
743     ///
744     /// # Cancel safety
745     ///
746     /// This method is cancel safe. Once a readiness event occurs, the method
747     /// will continue to return immediately until the readiness event is
748     /// consumed by an attempt to read or write that fails with `WouldBlock` or
749     /// `Poll::Pending`.
750     #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
writable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>>751     pub async fn writable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
752         self.ready(Interest::WRITABLE).await
753     }
754 
755     /// Waits for the file descriptor to become writable, returning a
756     /// [`AsyncFdReadyMutGuard`] that must be dropped to resume write-readiness
757     /// polling.
758     ///
759     /// This method takes `&mut self`, so it is possible to access the inner IO
760     /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
761     ///
762     /// # Cancel safety
763     ///
764     /// This method is cancel safe. Once a readiness event occurs, the method
765     /// will continue to return immediately until the readiness event is
766     /// consumed by an attempt to read or write that fails with `WouldBlock` or
767     /// `Poll::Pending`.
768     #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>>769     pub async fn writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
770         self.ready_mut(Interest::WRITABLE).await
771     }
772 
773     /// Reads or writes from the file descriptor using a user-provided IO operation.
774     ///
775     /// The `async_io` method is a convenience utility that waits for the file
776     /// descriptor to become ready, and then executes the provided IO operation.
777     /// Since file descriptors may be marked ready spuriously, the closure will
778     /// be called repeatedly until it returns something other than a
779     /// [`WouldBlock`] error. This is done using the following loop:
780     ///
781     /// ```no_run
782     /// # use std::io::{self, Result};
783     /// # struct Dox<T> { inner: T }
784     /// # impl<T> Dox<T> {
785     /// #     async fn writable(&self) -> Result<&Self> {
786     /// #         Ok(self)
787     /// #     }
788     /// #     fn try_io<R>(&self, _: impl FnMut(&T) -> Result<R>) -> Result<Result<R>> {
789     /// #         panic!()
790     /// #     }
791     /// async fn async_io<R>(&self, mut f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
792     ///     loop {
793     ///         // or `readable` if called with the read interest.
794     ///         let guard = self.writable().await?;
795     ///
796     ///         match guard.try_io(&mut f) {
797     ///             Ok(result) => return result,
798     ///             Err(_would_block) => continue,
799     ///         }
800     ///     }
801     /// }
802     /// # }
803     /// ```
804     ///
805     /// The closure should only return a [`WouldBlock`] error if it has performed
806     /// an IO operation on the file descriptor that failed due to the file descriptor not being
807     /// ready. Returning a [`WouldBlock`] error in any other situation will
808     /// incorrectly clear the readiness flag, which can cause the file descriptor to
809     /// behave incorrectly.
810     ///
811     /// The closure should not perform the IO operation using any of the methods
812     /// defined on the Tokio [`AsyncFd`] type, as this will mess with the
813     /// readiness flag and can cause the file descriptor to behave incorrectly.
814     ///
815     /// This method is not intended to be used with combined interests.
816     /// The closure should perform only one type of IO operation, so it should not
817     /// require more than one ready state. This method may panic or sleep forever
818     /// if it is called with a combined interest.
819     ///
820     /// # Examples
821     ///
822     /// This example sends some bytes on the inner [`std::net::UdpSocket`]. The `async_io`
823     /// method waits for readiness, and retries if the send operation does block. This example
824     /// is equivalent to the one given for [`try_io`].
825     ///
826     /// ```no_run
827     /// use tokio::io::{Interest, unix::AsyncFd};
828     ///
829     /// use std::io;
830     /// use std::net::UdpSocket;
831     ///
832     /// #[tokio::main]
833     /// async fn main() -> io::Result<()> {
834     ///     let socket = UdpSocket::bind("0.0.0.0:8080")?;
835     ///     socket.set_nonblocking(true)?;
836     ///     let async_fd = AsyncFd::new(socket)?;
837     ///
838     ///     let written = async_fd
839     ///         .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2]))
840     ///         .await?;
841     ///
842     ///     println!("wrote {written} bytes");
843     ///
844     ///     Ok(())
845     /// }
846     /// ```
847     ///
848     /// [`try_io`]: AsyncFdReadyGuard::try_io
849     /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
async_io<R>( &self, interest: Interest, mut f: impl FnMut(&T) -> io::Result<R>, ) -> io::Result<R>850     pub async fn async_io<R>(
851         &self,
852         interest: Interest,
853         mut f: impl FnMut(&T) -> io::Result<R>,
854     ) -> io::Result<R> {
855         self.registration
856             .async_io(interest, || f(self.get_ref()))
857             .await
858     }
859 
860     /// Reads or writes from the file descriptor using a user-provided IO operation.
861     ///
862     /// The behavior is the same as [`async_io`], except that the closure can mutate the inner
863     /// value of the [`AsyncFd`].
864     ///
865     /// [`async_io`]: AsyncFd::async_io
async_io_mut<R>( &mut self, interest: Interest, mut f: impl FnMut(&mut T) -> io::Result<R>, ) -> io::Result<R>866     pub async fn async_io_mut<R>(
867         &mut self,
868         interest: Interest,
869         mut f: impl FnMut(&mut T) -> io::Result<R>,
870     ) -> io::Result<R> {
871         self.registration
872             .async_io(interest, || f(self.inner.as_mut().unwrap()))
873             .await
874     }
875 
876     /// Tries to read or write from the file descriptor using a user-provided IO operation.
877     ///
878     /// If the file descriptor is ready, the provided closure is called. The closure
879     /// should attempt to perform IO operation on the file descriptor by manually
880     /// calling the appropriate syscall. If the operation fails because the
881     /// file descriptor is not actually ready, then the closure should return a
882     /// `WouldBlock` error and the readiness flag is cleared. The return value
883     /// of the closure is then returned by `try_io`.
884     ///
885     /// If the file descriptor is not ready, then the closure is not called
886     /// and a `WouldBlock` error is returned.
887     ///
888     /// The closure should only return a `WouldBlock` error if it has performed
889     /// an IO operation on the file descriptor that failed due to the file descriptor not being
890     /// ready. Returning a `WouldBlock` error in any other situation will
891     /// incorrectly clear the readiness flag, which can cause the file descriptor to
892     /// behave incorrectly.
893     ///
894     /// The closure should not perform the IO operation using any of the methods
895     /// defined on the Tokio `AsyncFd` type, as this will mess with the
896     /// readiness flag and can cause the file descriptor to behave incorrectly.
897     ///
898     /// This method is not intended to be used with combined interests.
899     /// The closure should perform only one type of IO operation, so it should not
900     /// require more than one ready state. This method may panic or sleep forever
901     /// if it is called with a combined interest.
try_io<R>( &self, interest: Interest, f: impl FnOnce(&T) -> io::Result<R>, ) -> io::Result<R>902     pub fn try_io<R>(
903         &self,
904         interest: Interest,
905         f: impl FnOnce(&T) -> io::Result<R>,
906     ) -> io::Result<R> {
907         self.registration
908             .try_io(interest, || f(self.inner.as_ref().unwrap()))
909     }
910 
911     /// Tries to read or write from the file descriptor using a user-provided IO operation.
912     ///
913     /// The behavior is the same as [`try_io`], except that the closure can mutate the inner
914     /// value of the [`AsyncFd`].
915     ///
916     /// [`try_io`]: AsyncFd::try_io
try_io_mut<R>( &mut self, interest: Interest, f: impl FnOnce(&mut T) -> io::Result<R>, ) -> io::Result<R>917     pub fn try_io_mut<R>(
918         &mut self,
919         interest: Interest,
920         f: impl FnOnce(&mut T) -> io::Result<R>,
921     ) -> io::Result<R> {
922         self.registration
923             .try_io(interest, || f(self.inner.as_mut().unwrap()))
924     }
925 }
926 
927 impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
as_raw_fd(&self) -> RawFd928     fn as_raw_fd(&self) -> RawFd {
929         self.inner.as_ref().unwrap().as_raw_fd()
930     }
931 }
932 
933 impl<T: AsRawFd> std::os::unix::io::AsFd for AsyncFd<T> {
as_fd(&self) -> std::os::unix::io::BorrowedFd<'_>934     fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
935         unsafe { std::os::unix::io::BorrowedFd::borrow_raw(self.as_raw_fd()) }
936     }
937 }
938 
939 impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result940     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
941         f.debug_struct("AsyncFd")
942             .field("inner", &self.inner)
943             .finish()
944     }
945 }
946 
947 impl<T: AsRawFd> Drop for AsyncFd<T> {
drop(&mut self)948     fn drop(&mut self) {
949         let _ = self.take_inner();
950     }
951 }
952 
953 impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
954     /// Indicates to tokio that the file descriptor is no longer ready. All
955     /// internal readiness flags will be cleared, and tokio will wait for the
956     /// next edge-triggered readiness notification from the OS.
957     ///
958     /// This function is commonly used with guards returned by [`AsyncFd::readable`] and
959     /// [`AsyncFd::writable`].
960     ///
961     /// It is critical that this function not be called unless your code
962     /// _actually observes_ that the file descriptor is _not_ ready. Do not call
963     /// it simply because, for example, a read succeeded; it should be called
964     /// when a read is observed to block.
965     ///
966     /// This method only clears readiness events that happened before the creation of this guard.
967     /// In other words, if the IO resource becomes ready between the creation of the guard and
968     /// this call to `clear_ready`, then the readiness is not actually cleared.
clear_ready(&mut self)969     pub fn clear_ready(&mut self) {
970         if let Some(event) = self.event.take() {
971             self.async_fd.registration.clear_readiness(event);
972         }
973     }
974 
975     /// Indicates to tokio that the file descriptor no longer has a specific readiness.
976     /// The internal readiness flag will be cleared, and tokio will wait for the
977     /// next edge-triggered readiness notification from the OS.
978     ///
979     /// This function is useful in combination with the [`AsyncFd::ready`] method when a
980     /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used.
981     ///
982     /// It is critical that this function not be called unless your code
983     /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`.
984     /// Do not call it simply because, for example, a read succeeded; it should be called
985     /// when a read is observed to block. Only clear the specific readiness that is observed to
986     /// block. For example when a read blocks when using a combined interest,
987     /// only clear `Ready::READABLE`.
988     ///
989     /// This method only clears readiness events that happened before the creation of this guard.
990     /// In other words, if the IO resource becomes ready between the creation of the guard and
991     /// this call to `clear_ready`, then the readiness is not actually cleared.
992     ///
993     /// # Examples
994     ///
995     /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
996     /// splitting.
997     ///
998     /// ```no_run
999     /// use std::error::Error;
1000     /// use std::io;
1001     /// use std::io::{Read, Write};
1002     /// use std::net::TcpStream;
1003     /// use tokio::io::unix::AsyncFd;
1004     /// use tokio::io::{Interest, Ready};
1005     ///
1006     /// #[tokio::main]
1007     /// async fn main() -> Result<(), Box<dyn Error>> {
1008     ///     let stream = TcpStream::connect("127.0.0.1:8080")?;
1009     ///     stream.set_nonblocking(true)?;
1010     ///     let stream = AsyncFd::new(stream)?;
1011     ///
1012     ///     loop {
1013     ///         let mut guard = stream
1014     ///             .ready(Interest::READABLE | Interest::WRITABLE)
1015     ///             .await?;
1016     ///
1017     ///         if guard.ready().is_readable() {
1018     ///             let mut data = vec![0; 1024];
1019     ///             // Try to read data, this may still fail with `WouldBlock`
1020     ///             // if the readiness event is a false positive.
1021     ///             match stream.get_ref().read(&mut data) {
1022     ///                 Ok(n) => {
1023     ///                     println!("read {} bytes", n);
1024     ///                 }
1025     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1026     ///                     // a read has blocked, but a write might still succeed.
1027     ///                     // clear only the read readiness.
1028     ///                     guard.clear_ready_matching(Ready::READABLE);
1029     ///                     continue;
1030     ///                 }
1031     ///                 Err(e) => {
1032     ///                     return Err(e.into());
1033     ///                 }
1034     ///             }
1035     ///         }
1036     ///
1037     ///         if guard.ready().is_writable() {
1038     ///             // Try to write data, this may still fail with `WouldBlock`
1039     ///             // if the readiness event is a false positive.
1040     ///             match stream.get_ref().write(b"hello world") {
1041     ///                 Ok(n) => {
1042     ///                     println!("write {} bytes", n);
1043     ///                 }
1044     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1045     ///                     // a write has blocked, but a read might still succeed.
1046     ///                     // clear only the write readiness.
1047     ///                     guard.clear_ready_matching(Ready::WRITABLE);
1048     ///                     continue;
1049     ///                 }
1050     ///                 Err(e) => {
1051     ///                     return Err(e.into());
1052     ///                 }
1053     ///             }
1054     ///         }
1055     ///     }
1056     /// }
1057     /// ```
clear_ready_matching(&mut self, ready: Ready)1058     pub fn clear_ready_matching(&mut self, ready: Ready) {
1059         if let Some(mut event) = self.event.take() {
1060             self.async_fd
1061                 .registration
1062                 .clear_readiness(event.with_ready(ready));
1063 
1064             // the event is no longer ready for the readiness that was just cleared
1065             event.ready = event.ready - ready;
1066 
1067             if !event.ready.is_empty() {
1068                 self.event = Some(event);
1069             }
1070         }
1071     }
1072 
1073     /// This method should be invoked when you intentionally want to keep the
1074     /// ready flag asserted.
1075     ///
1076     /// While this function is itself a no-op, it satisfies the `#[must_use]`
1077     /// constraint on the [`AsyncFdReadyGuard`] type.
retain_ready(&mut self)1078     pub fn retain_ready(&mut self) {
1079         // no-op
1080     }
1081 
1082     /// Get the [`Ready`] value associated with this guard.
1083     ///
1084     /// This method will return the empty readiness state if
1085     /// [`AsyncFdReadyGuard::clear_ready`] has been called on
1086     /// the guard.
1087     ///
1088     /// [`Ready`]: crate::io::Ready
ready(&self) -> Ready1089     pub fn ready(&self) -> Ready {
1090         match &self.event {
1091             Some(event) => event.ready,
1092             None => Ready::EMPTY,
1093         }
1094     }
1095 
1096     /// Performs the provided IO operation.
1097     ///
1098     /// If `f` returns a [`WouldBlock`] error, the readiness state associated
1099     /// with this file descriptor is cleared, and the method returns
1100     /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the
1101     /// `AsyncFd` again when this happens.
1102     ///
1103     /// This method helps ensure that the readiness state of the underlying file
1104     /// descriptor remains in sync with the tokio-side readiness state, by
1105     /// clearing the tokio-side state only when a [`WouldBlock`] condition
1106     /// occurs. It is the responsibility of the caller to ensure that `f`
1107     /// returns [`WouldBlock`] only if the file descriptor that originated this
1108     /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
1109     /// create this `AsyncFdReadyGuard`.
1110     ///
1111     /// # Examples
1112     ///
1113     /// This example sends some bytes to the inner [`std::net::UdpSocket`]. Waiting
1114     /// for write-readiness and retrying when the send operation does block are explicit.
1115     /// This example can be written more succinctly using [`AsyncFd::async_io`].
1116     ///
1117     /// ```no_run
1118     /// use tokio::io::unix::AsyncFd;
1119     ///
1120     /// use std::io;
1121     /// use std::net::UdpSocket;
1122     ///
1123     /// #[tokio::main]
1124     /// async fn main() -> io::Result<()> {
1125     ///     let socket = UdpSocket::bind("0.0.0.0:8080")?;
1126     ///     socket.set_nonblocking(true)?;
1127     ///     let async_fd = AsyncFd::new(socket)?;
1128     ///
1129     ///     let written = loop {
1130     ///         let mut guard = async_fd.writable().await?;
1131     ///         match guard.try_io(|inner| inner.get_ref().send(&[1, 2])) {
1132     ///             Ok(result) => {
1133     ///                 break result?;
1134     ///             }
1135     ///             Err(_would_block) => {
1136     ///                 // try_io already cleared the file descriptor's readiness state
1137     ///                 continue;
1138     ///             }
1139     ///         }
1140     ///     };
1141     ///
1142     ///     println!("wrote {written} bytes");
1143     ///
1144     ///     Ok(())
1145     /// }
1146     /// ```
1147     ///
1148     /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1149     // Alias for old name in 0.x
1150     #[cfg_attr(docsrs, doc(alias = "with_io"))]
try_io<R>( &mut self, f: impl FnOnce(&'a AsyncFd<Inner>) -> io::Result<R>, ) -> Result<io::Result<R>, TryIoError>1151     pub fn try_io<R>(
1152         &mut self,
1153         f: impl FnOnce(&'a AsyncFd<Inner>) -> io::Result<R>,
1154     ) -> Result<io::Result<R>, TryIoError> {
1155         let result = f(self.async_fd);
1156 
1157         match result {
1158             Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1159                 self.clear_ready();
1160                 Err(TryIoError(()))
1161             }
1162             result => Ok(result),
1163         }
1164     }
1165 
1166     /// Returns a shared reference to the inner [`AsyncFd`].
get_ref(&self) -> &'a AsyncFd<Inner>1167     pub fn get_ref(&self) -> &'a AsyncFd<Inner> {
1168         self.async_fd
1169     }
1170 
1171     /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
get_inner(&self) -> &'a Inner1172     pub fn get_inner(&self) -> &'a Inner {
1173         self.get_ref().get_ref()
1174     }
1175 }
1176 
1177 impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> {
1178     /// Indicates to tokio that the file descriptor is no longer ready. All
1179     /// internal readiness flags will be cleared, and tokio will wait for the
1180     /// next edge-triggered readiness notification from the OS.
1181     ///
1182     /// This function is commonly used with guards returned by [`AsyncFd::readable_mut`] and
1183     /// [`AsyncFd::writable_mut`].
1184     ///
1185     /// It is critical that this function not be called unless your code
1186     /// _actually observes_ that the file descriptor is _not_ ready. Do not call
1187     /// it simply because, for example, a read succeeded; it should be called
1188     /// when a read is observed to block.
1189     ///
1190     /// This method only clears readiness events that happened before the creation of this guard.
1191     /// In other words, if the IO resource becomes ready between the creation of the guard and
1192     /// this call to `clear_ready`, then the readiness is not actually cleared.
clear_ready(&mut self)1193     pub fn clear_ready(&mut self) {
1194         if let Some(event) = self.event.take() {
1195             self.async_fd.registration.clear_readiness(event);
1196         }
1197     }
1198 
1199     /// Indicates to tokio that the file descriptor no longer has a specific readiness.
1200     /// The internal readiness flag will be cleared, and tokio will wait for the
1201     /// next edge-triggered readiness notification from the OS.
1202     ///
1203     /// This function is useful in combination with the [`AsyncFd::ready_mut`] method when a
1204     /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used.
1205     ///
1206     /// It is critical that this function not be called unless your code
1207     /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`.
1208     /// Do not call it simply because, for example, a read succeeded; it should be called
1209     /// when a read is observed to block. Only clear the specific readiness that is observed to
1210     /// block. For example when a read blocks when using a combined interest,
1211     /// only clear `Ready::READABLE`.
1212     ///
1213     /// This method only clears readiness events that happened before the creation of this guard.
1214     /// In other words, if the IO resource becomes ready between the creation of the guard and
1215     /// this call to `clear_ready`, then the readiness is not actually cleared.
1216     ///
1217     /// # Examples
1218     ///
1219     /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
1220     /// splitting.
1221     ///
1222     /// ```no_run
1223     /// use std::error::Error;
1224     /// use std::io;
1225     /// use std::io::{Read, Write};
1226     /// use std::net::TcpStream;
1227     /// use tokio::io::unix::AsyncFd;
1228     /// use tokio::io::{Interest, Ready};
1229     ///
1230     /// #[tokio::main]
1231     /// async fn main() -> Result<(), Box<dyn Error>> {
1232     ///     let stream = TcpStream::connect("127.0.0.1:8080")?;
1233     ///     stream.set_nonblocking(true)?;
1234     ///     let mut stream = AsyncFd::new(stream)?;
1235     ///
1236     ///     loop {
1237     ///         let mut guard = stream
1238     ///             .ready_mut(Interest::READABLE | Interest::WRITABLE)
1239     ///             .await?;
1240     ///
1241     ///         if guard.ready().is_readable() {
1242     ///             let mut data = vec![0; 1024];
1243     ///             // Try to read data, this may still fail with `WouldBlock`
1244     ///             // if the readiness event is a false positive.
1245     ///             match guard.get_inner_mut().read(&mut data) {
1246     ///                 Ok(n) => {
1247     ///                     println!("read {} bytes", n);
1248     ///                 }
1249     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1250     ///                     // a read has blocked, but a write might still succeed.
1251     ///                     // clear only the read readiness.
1252     ///                     guard.clear_ready_matching(Ready::READABLE);
1253     ///                     continue;
1254     ///                 }
1255     ///                 Err(e) => {
1256     ///                     return Err(e.into());
1257     ///                 }
1258     ///             }
1259     ///         }
1260     ///
1261     ///         if guard.ready().is_writable() {
1262     ///             // Try to write data, this may still fail with `WouldBlock`
1263     ///             // if the readiness event is a false positive.
1264     ///             match guard.get_inner_mut().write(b"hello world") {
1265     ///                 Ok(n) => {
1266     ///                     println!("write {} bytes", n);
1267     ///                 }
1268     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1269     ///                     // a write has blocked, but a read might still succeed.
1270     ///                     // clear only the write readiness.
1271     ///                     guard.clear_ready_matching(Ready::WRITABLE);
1272     ///                     continue;
1273     ///                 }
1274     ///                 Err(e) => {
1275     ///                     return Err(e.into());
1276     ///                 }
1277     ///             }
1278     ///         }
1279     ///     }
1280     /// }
1281     /// ```
clear_ready_matching(&mut self, ready: Ready)1282     pub fn clear_ready_matching(&mut self, ready: Ready) {
1283         if let Some(mut event) = self.event.take() {
1284             self.async_fd
1285                 .registration
1286                 .clear_readiness(event.with_ready(ready));
1287 
1288             // the event is no longer ready for the readiness that was just cleared
1289             event.ready = event.ready - ready;
1290 
1291             if !event.ready.is_empty() {
1292                 self.event = Some(event);
1293             }
1294         }
1295     }
1296 
1297     /// This method should be invoked when you intentionally want to keep the
1298     /// ready flag asserted.
1299     ///
1300     /// While this function is itself a no-op, it satisfies the `#[must_use]`
1301     /// constraint on the [`AsyncFdReadyGuard`] type.
retain_ready(&mut self)1302     pub fn retain_ready(&mut self) {
1303         // no-op
1304     }
1305 
1306     /// Get the [`Ready`] value associated with this guard.
1307     ///
1308     /// This method will return the empty readiness state if
1309     /// [`AsyncFdReadyGuard::clear_ready`] has been called on
1310     /// the guard.
1311     ///
1312     /// [`Ready`]: super::Ready
ready(&self) -> Ready1313     pub fn ready(&self) -> Ready {
1314         match &self.event {
1315             Some(event) => event.ready,
1316             None => Ready::EMPTY,
1317         }
1318     }
1319 
1320     /// Performs the provided IO operation.
1321     ///
1322     /// If `f` returns a [`WouldBlock`] error, the readiness state associated
1323     /// with this file descriptor is cleared, and the method returns
1324     /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the
1325     /// `AsyncFd` again when this happens.
1326     ///
1327     /// This method helps ensure that the readiness state of the underlying file
1328     /// descriptor remains in sync with the tokio-side readiness state, by
1329     /// clearing the tokio-side state only when a [`WouldBlock`] condition
1330     /// occurs. It is the responsibility of the caller to ensure that `f`
1331     /// returns [`WouldBlock`] only if the file descriptor that originated this
1332     /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
1333     /// create this `AsyncFdReadyGuard`.
1334     ///
1335     /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
try_io<R>( &mut self, f: impl FnOnce(&mut AsyncFd<Inner>) -> io::Result<R>, ) -> Result<io::Result<R>, TryIoError>1336     pub fn try_io<R>(
1337         &mut self,
1338         f: impl FnOnce(&mut AsyncFd<Inner>) -> io::Result<R>,
1339     ) -> Result<io::Result<R>, TryIoError> {
1340         let result = f(self.async_fd);
1341 
1342         match result {
1343             Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1344                 self.clear_ready();
1345                 Err(TryIoError(()))
1346             }
1347             result => Ok(result),
1348         }
1349     }
1350 
1351     /// Returns a shared reference to the inner [`AsyncFd`].
get_ref(&self) -> &AsyncFd<Inner>1352     pub fn get_ref(&self) -> &AsyncFd<Inner> {
1353         self.async_fd
1354     }
1355 
1356     /// Returns a mutable reference to the inner [`AsyncFd`].
get_mut(&mut self) -> &mut AsyncFd<Inner>1357     pub fn get_mut(&mut self) -> &mut AsyncFd<Inner> {
1358         self.async_fd
1359     }
1360 
1361     /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
get_inner(&self) -> &Inner1362     pub fn get_inner(&self) -> &Inner {
1363         self.get_ref().get_ref()
1364     }
1365 
1366     /// Returns a mutable reference to the backing object of the inner [`AsyncFd`].
get_inner_mut(&mut self) -> &mut Inner1367     pub fn get_inner_mut(&mut self) -> &mut Inner {
1368         self.get_mut().get_mut()
1369     }
1370 }
1371 
1372 impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result1373     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1374         f.debug_struct("ReadyGuard")
1375             .field("async_fd", &self.async_fd)
1376             .finish()
1377     }
1378 }
1379 
1380 impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyMutGuard<'a, T> {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result1381     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1382         f.debug_struct("MutReadyGuard")
1383             .field("async_fd", &self.async_fd)
1384             .finish()
1385     }
1386 }
1387 
1388 /// The error type returned by [`try_io`].
1389 ///
1390 /// This error indicates that the IO resource returned a [`WouldBlock`] error.
1391 ///
1392 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1393 /// [`try_io`]: method@AsyncFdReadyGuard::try_io
1394 #[derive(Debug)]
1395 pub struct TryIoError(());
1396 
1397 /// Error returned by [`try_new`] or [`try_with_interest`].
1398 ///
1399 /// [`try_new`]: AsyncFd::try_new
1400 /// [`try_with_interest`]: AsyncFd::try_with_interest
1401 pub struct AsyncFdTryNewError<T> {
1402     inner: T,
1403     cause: io::Error,
1404 }
1405 
1406 impl<T> AsyncFdTryNewError<T> {
1407     /// Returns the original object passed to [`try_new`] or [`try_with_interest`]
1408     /// alongside the error that caused these functions to fail.
1409     ///
1410     /// [`try_new`]: AsyncFd::try_new
1411     /// [`try_with_interest`]: AsyncFd::try_with_interest
into_parts(self) -> (T, io::Error)1412     pub fn into_parts(self) -> (T, io::Error) {
1413         (self.inner, self.cause)
1414     }
1415 }
1416 
1417 impl<T> fmt::Display for AsyncFdTryNewError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1418     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1419         fmt::Display::fmt(&self.cause, f)
1420     }
1421 }
1422 
1423 impl<T> fmt::Debug for AsyncFdTryNewError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1424     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1425         fmt::Debug::fmt(&self.cause, f)
1426     }
1427 }
1428 
1429 impl<T> Error for AsyncFdTryNewError<T> {
source(&self) -> Option<&(dyn Error + 'static)>1430     fn source(&self) -> Option<&(dyn Error + 'static)> {
1431         Some(&self.cause)
1432     }
1433 }
1434 
1435 impl<T> From<AsyncFdTryNewError<T>> for io::Error {
from(value: AsyncFdTryNewError<T>) -> Self1436     fn from(value: AsyncFdTryNewError<T>) -> Self {
1437         value.cause
1438     }
1439 }
1440