1 //! Parks the runtime. 2 //! 3 //! A combination of the various resource driver park handles. 4 5 use crate::loom::sync::atomic::AtomicUsize; 6 use crate::loom::sync::{Arc, Condvar, Mutex}; 7 use crate::runtime::driver::{self, Driver}; 8 use crate::util::TryLock; 9 10 use std::sync::atomic::Ordering::SeqCst; 11 use std::time::Duration; 12 13 #[cfg(loom)] 14 use crate::runtime::park::CURRENT_THREAD_PARK_COUNT; 15 16 pub(crate) struct Parker { 17 inner: Arc<Inner>, 18 } 19 20 pub(crate) struct Unparker { 21 inner: Arc<Inner>, 22 } 23 24 struct Inner { 25 /// Avoids entering the park if possible 26 state: AtomicUsize, 27 28 /// Used to coordinate access to the driver / `condvar` 29 mutex: Mutex<()>, 30 31 /// `Condvar` to block on if the driver is unavailable. 32 condvar: Condvar, 33 34 /// Resource (I/O, time, ...) driver 35 shared: Arc<Shared>, 36 } 37 38 const EMPTY: usize = 0; 39 const PARKED_CONDVAR: usize = 1; 40 const PARKED_DRIVER: usize = 2; 41 const NOTIFIED: usize = 3; 42 43 /// Shared across multiple Parker handles 44 struct Shared { 45 /// Shared driver. Only one thread at a time can use this 46 driver: TryLock<Driver>, 47 } 48 49 impl Parker { new(driver: Driver) -> Parker50 pub(crate) fn new(driver: Driver) -> Parker { 51 Parker { 52 inner: Arc::new(Inner { 53 state: AtomicUsize::new(EMPTY), 54 mutex: Mutex::new(()), 55 condvar: Condvar::new(), 56 shared: Arc::new(Shared { 57 driver: TryLock::new(driver), 58 }), 59 }), 60 } 61 } 62 unpark(&self) -> Unparker63 pub(crate) fn unpark(&self) -> Unparker { 64 Unparker { 65 inner: self.inner.clone(), 66 } 67 } 68 park(&mut self, handle: &driver::Handle)69 pub(crate) fn park(&mut self, handle: &driver::Handle) { 70 self.inner.park(handle); 71 } 72 park_timeout(&mut self, handle: &driver::Handle, duration: Duration)73 pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { 74 // Only parking with zero is supported... 75 assert_eq!(duration, Duration::from_millis(0)); 76 77 if let Some(mut driver) = self.inner.shared.driver.try_lock() { 78 driver.park_timeout(handle, duration); 79 } else { 80 // https://github.com/tokio-rs/tokio/issues/6536 81 // Hacky, but it's just for loom tests. The counter gets incremented during 82 // `park_timeout`, but we still have to increment the counter if we can't acquire the 83 // lock. 84 #[cfg(loom)] 85 CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst)); 86 } 87 } 88 shutdown(&mut self, handle: &driver::Handle)89 pub(crate) fn shutdown(&mut self, handle: &driver::Handle) { 90 self.inner.shutdown(handle); 91 } 92 } 93 94 impl Clone for Parker { clone(&self) -> Parker95 fn clone(&self) -> Parker { 96 Parker { 97 inner: Arc::new(Inner { 98 state: AtomicUsize::new(EMPTY), 99 mutex: Mutex::new(()), 100 condvar: Condvar::new(), 101 shared: self.inner.shared.clone(), 102 }), 103 } 104 } 105 } 106 107 impl Unparker { unpark(&self, driver: &driver::Handle)108 pub(crate) fn unpark(&self, driver: &driver::Handle) { 109 self.inner.unpark(driver); 110 } 111 } 112 113 impl Inner { 114 /// Parks the current thread for at most `dur`. park(&self, handle: &driver::Handle)115 fn park(&self, handle: &driver::Handle) { 116 // If we were previously notified then we consume this notification and 117 // return quickly. 118 if self 119 .state 120 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 121 .is_ok() 122 { 123 return; 124 } 125 126 if let Some(mut driver) = self.shared.driver.try_lock() { 127 self.park_driver(&mut driver, handle); 128 } else { 129 self.park_condvar(); 130 } 131 } 132 park_condvar(&self)133 fn park_condvar(&self) { 134 // Otherwise we need to coordinate going to sleep 135 let mut m = self.mutex.lock(); 136 137 match self 138 .state 139 .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst) 140 { 141 Ok(_) => {} 142 Err(NOTIFIED) => { 143 // We must read here, even though we know it will be `NOTIFIED`. 144 // This is because `unpark` may have been called again since we read 145 // `NOTIFIED` in the `compare_exchange` above. We must perform an 146 // acquire operation that synchronizes with that `unpark` to observe 147 // any writes it made before the call to unpark. To do that we must 148 // read from the write it made to `state`. 149 let old = self.state.swap(EMPTY, SeqCst); 150 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); 151 152 return; 153 } 154 Err(actual) => panic!("inconsistent park state; actual = {actual}"), 155 } 156 157 loop { 158 m = self.condvar.wait(m).unwrap(); 159 160 if self 161 .state 162 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 163 .is_ok() 164 { 165 // got a notification 166 return; 167 } 168 169 // spurious wakeup, go back to sleep 170 } 171 } 172 park_driver(&self, driver: &mut Driver, handle: &driver::Handle)173 fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) { 174 match self 175 .state 176 .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) 177 { 178 Ok(_) => {} 179 Err(NOTIFIED) => { 180 // We must read here, even though we know it will be `NOTIFIED`. 181 // This is because `unpark` may have been called again since we read 182 // `NOTIFIED` in the `compare_exchange` above. We must perform an 183 // acquire operation that synchronizes with that `unpark` to observe 184 // any writes it made before the call to unpark. To do that we must 185 // read from the write it made to `state`. 186 let old = self.state.swap(EMPTY, SeqCst); 187 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); 188 189 return; 190 } 191 Err(actual) => panic!("inconsistent park state; actual = {actual}"), 192 } 193 194 driver.park(handle); 195 196 match self.state.swap(EMPTY, SeqCst) { 197 NOTIFIED => {} // got a notification, hurray! 198 PARKED_DRIVER => {} // no notification, alas 199 n => panic!("inconsistent park_timeout state: {n}"), 200 } 201 } 202 unpark(&self, driver: &driver::Handle)203 fn unpark(&self, driver: &driver::Handle) { 204 // To ensure the unparked thread will observe any writes we made before 205 // this call, we must perform a release operation that `park` can 206 // synchronize with. To do that we must write `NOTIFIED` even if `state` 207 // is already `NOTIFIED`. That is why this must be a swap rather than a 208 // compare-and-swap that returns if it reads `NOTIFIED` on failure. 209 match self.state.swap(NOTIFIED, SeqCst) { 210 EMPTY => {} // no one was waiting 211 NOTIFIED => {} // already unparked 212 PARKED_CONDVAR => self.unpark_condvar(), 213 PARKED_DRIVER => driver.unpark(), 214 actual => panic!("inconsistent state in unpark; actual = {actual}"), 215 } 216 } 217 unpark_condvar(&self)218 fn unpark_condvar(&self) { 219 // There is a period between when the parked thread sets `state` to 220 // `PARKED` (or last checked `state` in the case of a spurious wake 221 // up) and when it actually waits on `cvar`. If we were to notify 222 // during this period it would be ignored and then when the parked 223 // thread went to sleep it would never wake up. Fortunately, it has 224 // `lock` locked at this stage so we can acquire `lock` to wait until 225 // it is ready to receive the notification. 226 // 227 // Releasing `lock` before the call to `notify_one` means that when the 228 // parked thread wakes it doesn't get woken only to have to wait for us 229 // to release `lock`. 230 drop(self.mutex.lock()); 231 232 self.condvar.notify_one(); 233 } 234 shutdown(&self, handle: &driver::Handle)235 fn shutdown(&self, handle: &driver::Handle) { 236 if let Some(mut driver) = self.shared.driver.try_lock() { 237 driver.shutdown(handle); 238 } 239 240 self.condvar.notify_all(); 241 } 242 } 243