1 //! Parks the runtime.
2 //!
3 //! A combination of the various resource driver park handles.
4 
5 use crate::loom::sync::atomic::AtomicUsize;
6 use crate::loom::sync::{Arc, Condvar, Mutex};
7 use crate::runtime::driver::{self, Driver};
8 use crate::util::TryLock;
9 
10 use std::sync::atomic::Ordering::SeqCst;
11 use std::time::Duration;
12 
13 #[cfg(loom)]
14 use crate::runtime::park::CURRENT_THREAD_PARK_COUNT;
15 
16 pub(crate) struct Parker {
17     inner: Arc<Inner>,
18 }
19 
20 pub(crate) struct Unparker {
21     inner: Arc<Inner>,
22 }
23 
24 struct Inner {
25     /// Avoids entering the park if possible
26     state: AtomicUsize,
27 
28     /// Used to coordinate access to the driver / `condvar`
29     mutex: Mutex<()>,
30 
31     /// `Condvar` to block on if the driver is unavailable.
32     condvar: Condvar,
33 
34     /// Resource (I/O, time, ...) driver
35     shared: Arc<Shared>,
36 }
37 
38 const EMPTY: usize = 0;
39 const PARKED_CONDVAR: usize = 1;
40 const PARKED_DRIVER: usize = 2;
41 const NOTIFIED: usize = 3;
42 
43 /// Shared across multiple Parker handles
44 struct Shared {
45     /// Shared driver. Only one thread at a time can use this
46     driver: TryLock<Driver>,
47 }
48 
49 impl Parker {
new(driver: Driver) -> Parker50     pub(crate) fn new(driver: Driver) -> Parker {
51         Parker {
52             inner: Arc::new(Inner {
53                 state: AtomicUsize::new(EMPTY),
54                 mutex: Mutex::new(()),
55                 condvar: Condvar::new(),
56                 shared: Arc::new(Shared {
57                     driver: TryLock::new(driver),
58                 }),
59             }),
60         }
61     }
62 
unpark(&self) -> Unparker63     pub(crate) fn unpark(&self) -> Unparker {
64         Unparker {
65             inner: self.inner.clone(),
66         }
67     }
68 
park(&mut self, handle: &driver::Handle)69     pub(crate) fn park(&mut self, handle: &driver::Handle) {
70         self.inner.park(handle);
71     }
72 
park_timeout(&mut self, handle: &driver::Handle, duration: Duration)73     pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
74         // Only parking with zero is supported...
75         assert_eq!(duration, Duration::from_millis(0));
76 
77         if let Some(mut driver) = self.inner.shared.driver.try_lock() {
78             driver.park_timeout(handle, duration);
79         } else {
80             // https://github.com/tokio-rs/tokio/issues/6536
81             // Hacky, but it's just for loom tests. The counter gets incremented during
82             // `park_timeout`, but we still have to increment the counter if we can't acquire the
83             // lock.
84             #[cfg(loom)]
85             CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
86         }
87     }
88 
shutdown(&mut self, handle: &driver::Handle)89     pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
90         self.inner.shutdown(handle);
91     }
92 }
93 
94 impl Clone for Parker {
clone(&self) -> Parker95     fn clone(&self) -> Parker {
96         Parker {
97             inner: Arc::new(Inner {
98                 state: AtomicUsize::new(EMPTY),
99                 mutex: Mutex::new(()),
100                 condvar: Condvar::new(),
101                 shared: self.inner.shared.clone(),
102             }),
103         }
104     }
105 }
106 
107 impl Unparker {
unpark(&self, driver: &driver::Handle)108     pub(crate) fn unpark(&self, driver: &driver::Handle) {
109         self.inner.unpark(driver);
110     }
111 }
112 
113 impl Inner {
114     /// Parks the current thread for at most `dur`.
park(&self, handle: &driver::Handle)115     fn park(&self, handle: &driver::Handle) {
116         // If we were previously notified then we consume this notification and
117         // return quickly.
118         if self
119             .state
120             .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
121             .is_ok()
122         {
123             return;
124         }
125 
126         if let Some(mut driver) = self.shared.driver.try_lock() {
127             self.park_driver(&mut driver, handle);
128         } else {
129             self.park_condvar();
130         }
131     }
132 
park_condvar(&self)133     fn park_condvar(&self) {
134         // Otherwise we need to coordinate going to sleep
135         let mut m = self.mutex.lock();
136 
137         match self
138             .state
139             .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
140         {
141             Ok(_) => {}
142             Err(NOTIFIED) => {
143                 // We must read here, even though we know it will be `NOTIFIED`.
144                 // This is because `unpark` may have been called again since we read
145                 // `NOTIFIED` in the `compare_exchange` above. We must perform an
146                 // acquire operation that synchronizes with that `unpark` to observe
147                 // any writes it made before the call to unpark. To do that we must
148                 // read from the write it made to `state`.
149                 let old = self.state.swap(EMPTY, SeqCst);
150                 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
151 
152                 return;
153             }
154             Err(actual) => panic!("inconsistent park state; actual = {actual}"),
155         }
156 
157         loop {
158             m = self.condvar.wait(m).unwrap();
159 
160             if self
161                 .state
162                 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
163                 .is_ok()
164             {
165                 // got a notification
166                 return;
167             }
168 
169             // spurious wakeup, go back to sleep
170         }
171     }
172 
park_driver(&self, driver: &mut Driver, handle: &driver::Handle)173     fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
174         match self
175             .state
176             .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
177         {
178             Ok(_) => {}
179             Err(NOTIFIED) => {
180                 // We must read here, even though we know it will be `NOTIFIED`.
181                 // This is because `unpark` may have been called again since we read
182                 // `NOTIFIED` in the `compare_exchange` above. We must perform an
183                 // acquire operation that synchronizes with that `unpark` to observe
184                 // any writes it made before the call to unpark. To do that we must
185                 // read from the write it made to `state`.
186                 let old = self.state.swap(EMPTY, SeqCst);
187                 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
188 
189                 return;
190             }
191             Err(actual) => panic!("inconsistent park state; actual = {actual}"),
192         }
193 
194         driver.park(handle);
195 
196         match self.state.swap(EMPTY, SeqCst) {
197             NOTIFIED => {}      // got a notification, hurray!
198             PARKED_DRIVER => {} // no notification, alas
199             n => panic!("inconsistent park_timeout state: {n}"),
200         }
201     }
202 
unpark(&self, driver: &driver::Handle)203     fn unpark(&self, driver: &driver::Handle) {
204         // To ensure the unparked thread will observe any writes we made before
205         // this call, we must perform a release operation that `park` can
206         // synchronize with. To do that we must write `NOTIFIED` even if `state`
207         // is already `NOTIFIED`. That is why this must be a swap rather than a
208         // compare-and-swap that returns if it reads `NOTIFIED` on failure.
209         match self.state.swap(NOTIFIED, SeqCst) {
210             EMPTY => {}    // no one was waiting
211             NOTIFIED => {} // already unparked
212             PARKED_CONDVAR => self.unpark_condvar(),
213             PARKED_DRIVER => driver.unpark(),
214             actual => panic!("inconsistent state in unpark; actual = {actual}"),
215         }
216     }
217 
unpark_condvar(&self)218     fn unpark_condvar(&self) {
219         // There is a period between when the parked thread sets `state` to
220         // `PARKED` (or last checked `state` in the case of a spurious wake
221         // up) and when it actually waits on `cvar`. If we were to notify
222         // during this period it would be ignored and then when the parked
223         // thread went to sleep it would never wake up. Fortunately, it has
224         // `lock` locked at this stage so we can acquire `lock` to wait until
225         // it is ready to receive the notification.
226         //
227         // Releasing `lock` before the call to `notify_one` means that when the
228         // parked thread wakes it doesn't get woken only to have to wait for us
229         // to release `lock`.
230         drop(self.mutex.lock());
231 
232         self.condvar.notify_one();
233     }
234 
shutdown(&self, handle: &driver::Handle)235     fn shutdown(&self, handle: &driver::Handle) {
236         if let Some(mut driver) = self.shared.driver.try_lock() {
237             driver.shutdown(handle);
238         }
239 
240         self.condvar.notify_all();
241     }
242 }
243