1 #![cfg_attr(not(feature = "net"), allow(dead_code))]
2 
3 use crate::io::interest::Interest;
4 use crate::runtime::io::{Direction, Handle, ReadyEvent, ScheduledIo};
5 use crate::runtime::scheduler;
6 
7 use mio::event::Source;
8 use std::io;
9 use std::sync::Arc;
10 use std::task::{ready, Context, Poll};
11 
12 cfg_io_driver! {
13     /// Associates an I/O resource with the reactor instance that drives it.
14     ///
15     /// A registration represents an I/O resource registered with a Reactor such
16     /// that it will receive task notifications on readiness. This is the lowest
17     /// level API for integrating with a reactor.
18     ///
19     /// The association between an I/O resource is made by calling
20     /// [`new_with_interest_and_handle`].
21     /// Once the association is established, it remains established until the
22     /// registration instance is dropped.
23     ///
24     /// A registration instance represents two separate readiness streams. One
25     /// for the read readiness and one for write readiness. These streams are
26     /// independent and can be consumed from separate tasks.
27     ///
28     /// **Note**: while `Registration` is `Sync`, the caller must ensure that
29     /// there are at most two tasks that use a registration instance
30     /// concurrently. One task for [`poll_read_ready`] and one task for
31     /// [`poll_write_ready`]. While violating this requirement is "safe" from a
32     /// Rust memory safety point of view, it will result in unexpected behavior
33     /// in the form of lost notifications and tasks hanging.
34     ///
35     /// ## Platform-specific events
36     ///
37     /// `Registration` also allows receiving platform-specific `mio::Ready`
38     /// events. These events are included as part of the read readiness event
39     /// stream. The write readiness event stream is only for `Ready::writable()`
40     /// events.
41     ///
42     /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
43     /// [`poll_read_ready`]: method@Self::poll_read_ready`
44     /// [`poll_write_ready`]: method@Self::poll_write_ready`
45     #[derive(Debug)]
46     pub(crate) struct Registration {
47         /// Handle to the associated runtime.
48         ///
49         /// TODO: this can probably be moved into `ScheduledIo`.
50         handle: scheduler::Handle,
51 
52         /// Reference to state stored by the driver.
53         shared: Arc<ScheduledIo>,
54     }
55 }
56 
57 unsafe impl Send for Registration {}
58 unsafe impl Sync for Registration {}
59 
60 // ===== impl Registration =====
61 
62 impl Registration {
63     /// Registers the I/O resource with the reactor for the provided handle, for
64     /// a specific `Interest`. This does not add `hup` or `error` so if you are
65     /// interested in those states, you will need to add them to the readiness
66     /// state passed to this function.
67     ///
68     /// # Return
69     ///
70     /// - `Ok` if the registration happened successfully
71     /// - `Err` if an error was encountered during registration
72     #[track_caller]
new_with_interest_and_handle( io: &mut impl Source, interest: Interest, handle: scheduler::Handle, ) -> io::Result<Registration>73     pub(crate) fn new_with_interest_and_handle(
74         io: &mut impl Source,
75         interest: Interest,
76         handle: scheduler::Handle,
77     ) -> io::Result<Registration> {
78         let shared = handle.driver().io().add_source(io, interest)?;
79 
80         Ok(Registration { handle, shared })
81     }
82 
83     /// Deregisters the I/O resource from the reactor it is associated with.
84     ///
85     /// This function must be called before the I/O resource associated with the
86     /// registration is dropped.
87     ///
88     /// Note that deregistering does not guarantee that the I/O resource can be
89     /// registered with a different reactor. Some I/O resource types can only be
90     /// associated with a single reactor instance for their lifetime.
91     ///
92     /// # Return
93     ///
94     /// If the deregistration was successful, `Ok` is returned. Any calls to
95     /// `Reactor::turn` that happen after a successful call to `deregister` will
96     /// no longer result in notifications getting sent for this registration.
97     ///
98     /// `Err` is returned if an error is encountered.
deregister(&mut self, io: &mut impl Source) -> io::Result<()>99     pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
100         self.handle().deregister_source(&self.shared, io)
101     }
102 
clear_readiness(&self, event: ReadyEvent)103     pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
104         self.shared.clear_readiness(event);
105     }
106 
107     // Uses the poll path, requiring the caller to ensure mutual exclusion for
108     // correctness. Only the last task to call this function is notified.
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>>109     pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
110         self.poll_ready(cx, Direction::Read)
111     }
112 
113     // Uses the poll path, requiring the caller to ensure mutual exclusion for
114     // correctness. Only the last task to call this function is notified.
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>>115     pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
116         self.poll_ready(cx, Direction::Write)
117     }
118 
119     // Uses the poll path, requiring the caller to ensure mutual exclusion for
120     // correctness. Only the last task to call this function is notified.
121     #[cfg(not(target_os = "wasi"))]
poll_read_io<R>( &self, cx: &mut Context<'_>, f: impl FnMut() -> io::Result<R>, ) -> Poll<io::Result<R>>122     pub(crate) fn poll_read_io<R>(
123         &self,
124         cx: &mut Context<'_>,
125         f: impl FnMut() -> io::Result<R>,
126     ) -> Poll<io::Result<R>> {
127         self.poll_io(cx, Direction::Read, f)
128     }
129 
130     // Uses the poll path, requiring the caller to ensure mutual exclusion for
131     // correctness. Only the last task to call this function is notified.
poll_write_io<R>( &self, cx: &mut Context<'_>, f: impl FnMut() -> io::Result<R>, ) -> Poll<io::Result<R>>132     pub(crate) fn poll_write_io<R>(
133         &self,
134         cx: &mut Context<'_>,
135         f: impl FnMut() -> io::Result<R>,
136     ) -> Poll<io::Result<R>> {
137         self.poll_io(cx, Direction::Write, f)
138     }
139 
140     /// Polls for events on the I/O resource's `direction` readiness stream.
141     ///
142     /// If called with a task context, notify the task when a new event is
143     /// received.
poll_ready( &self, cx: &mut Context<'_>, direction: Direction, ) -> Poll<io::Result<ReadyEvent>>144     fn poll_ready(
145         &self,
146         cx: &mut Context<'_>,
147         direction: Direction,
148     ) -> Poll<io::Result<ReadyEvent>> {
149         ready!(crate::trace::trace_leaf(cx));
150         // Keep track of task budget
151         let coop = ready!(crate::runtime::coop::poll_proceed(cx));
152         let ev = ready!(self.shared.poll_readiness(cx, direction));
153 
154         if ev.is_shutdown {
155             return Poll::Ready(Err(gone()));
156         }
157 
158         coop.made_progress();
159         Poll::Ready(Ok(ev))
160     }
161 
poll_io<R>( &self, cx: &mut Context<'_>, direction: Direction, mut f: impl FnMut() -> io::Result<R>, ) -> Poll<io::Result<R>>162     fn poll_io<R>(
163         &self,
164         cx: &mut Context<'_>,
165         direction: Direction,
166         mut f: impl FnMut() -> io::Result<R>,
167     ) -> Poll<io::Result<R>> {
168         loop {
169             let ev = ready!(self.poll_ready(cx, direction))?;
170 
171             match f() {
172                 Ok(ret) => {
173                     return Poll::Ready(Ok(ret));
174                 }
175                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
176                     self.clear_readiness(ev);
177                 }
178                 Err(e) => return Poll::Ready(Err(e)),
179             }
180         }
181     }
182 
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>183     pub(crate) fn try_io<R>(
184         &self,
185         interest: Interest,
186         f: impl FnOnce() -> io::Result<R>,
187     ) -> io::Result<R> {
188         let ev = self.shared.ready_event(interest);
189 
190         // Don't attempt the operation if the resource is not ready.
191         if ev.ready.is_empty() {
192             return Err(io::ErrorKind::WouldBlock.into());
193         }
194 
195         match f() {
196             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
197                 self.clear_readiness(ev);
198                 Err(io::ErrorKind::WouldBlock.into())
199             }
200             res => res,
201         }
202     }
203 
readiness(&self, interest: Interest) -> io::Result<ReadyEvent>204     pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
205         let ev = self.shared.readiness(interest).await;
206 
207         if ev.is_shutdown {
208             return Err(gone());
209         }
210 
211         Ok(ev)
212     }
213 
async_io<R>( &self, interest: Interest, mut f: impl FnMut() -> io::Result<R>, ) -> io::Result<R>214     pub(crate) async fn async_io<R>(
215         &self,
216         interest: Interest,
217         mut f: impl FnMut() -> io::Result<R>,
218     ) -> io::Result<R> {
219         loop {
220             let event = self.readiness(interest).await?;
221 
222             let coop = std::future::poll_fn(crate::runtime::coop::poll_proceed).await;
223 
224             match f() {
225                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
226                     self.clear_readiness(event);
227                 }
228                 x => {
229                     coop.made_progress();
230                     return x;
231                 }
232             }
233         }
234     }
235 
handle(&self) -> &Handle236     fn handle(&self) -> &Handle {
237         self.handle.driver().io()
238     }
239 }
240 
241 impl Drop for Registration {
drop(&mut self)242     fn drop(&mut self) {
243         // It is possible for a cycle to be created between wakers stored in
244         // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this
245         // cycle, wakers are cleared. This is an imperfect solution as it is
246         // possible to store a `Registration` in a waker. In this case, the
247         // cycle would remain.
248         //
249         // See tokio-rs/tokio#3481 for more details.
250         self.shared.clear_wakers();
251     }
252 }
253 
gone() -> io::Error254 fn gone() -> io::Error {
255     io::Error::new(
256         io::ErrorKind::Other,
257         crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
258     )
259 }
260