1 //! Parks the runtime. 2 //! 3 //! A combination of the various resource driver park handles. 4 5 use crate::loom::sync::atomic::AtomicUsize; 6 use crate::loom::sync::{Arc, Condvar, Mutex}; 7 use crate::runtime::driver::{self, Driver}; 8 use crate::util::TryLock; 9 10 use std::sync::atomic::Ordering::SeqCst; 11 use std::time::Duration; 12 13 pub(crate) struct Parker { 14 inner: Arc<Inner>, 15 } 16 17 pub(crate) struct Unparker { 18 inner: Arc<Inner>, 19 } 20 21 struct Inner { 22 /// Avoids entering the park if possible 23 state: AtomicUsize, 24 25 /// Used to coordinate access to the driver / condvar 26 mutex: Mutex<()>, 27 28 /// Condvar to block on if the driver is unavailable. 29 condvar: Condvar, 30 31 /// Resource (I/O, time, ...) driver 32 shared: Arc<Shared>, 33 } 34 35 const EMPTY: usize = 0; 36 const PARKED_CONDVAR: usize = 1; 37 const PARKED_DRIVER: usize = 2; 38 const NOTIFIED: usize = 3; 39 40 /// Shared across multiple Parker handles 41 struct Shared { 42 /// Shared driver. Only one thread at a time can use this 43 driver: TryLock<Driver>, 44 } 45 46 impl Parker { new(driver: Driver) -> Parker47 pub(crate) fn new(driver: Driver) -> Parker { 48 Parker { 49 inner: Arc::new(Inner { 50 state: AtomicUsize::new(EMPTY), 51 mutex: Mutex::new(()), 52 condvar: Condvar::new(), 53 shared: Arc::new(Shared { 54 driver: TryLock::new(driver), 55 }), 56 }), 57 } 58 } 59 unpark(&self) -> Unparker60 pub(crate) fn unpark(&self) -> Unparker { 61 Unparker { 62 inner: self.inner.clone(), 63 } 64 } 65 park(&mut self, handle: &driver::Handle)66 pub(crate) fn park(&mut self, handle: &driver::Handle) { 67 self.inner.park(handle); 68 } 69 park_timeout(&mut self, handle: &driver::Handle, duration: Duration)70 pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { 71 // Only parking with zero is supported... 72 assert_eq!(duration, Duration::from_millis(0)); 73 74 if let Some(mut driver) = self.inner.shared.driver.try_lock() { 75 driver.park_timeout(handle, duration) 76 } 77 } 78 shutdown(&mut self, handle: &driver::Handle)79 pub(crate) fn shutdown(&mut self, handle: &driver::Handle) { 80 self.inner.shutdown(handle); 81 } 82 } 83 84 impl Clone for Parker { clone(&self) -> Parker85 fn clone(&self) -> Parker { 86 Parker { 87 inner: Arc::new(Inner { 88 state: AtomicUsize::new(EMPTY), 89 mutex: Mutex::new(()), 90 condvar: Condvar::new(), 91 shared: self.inner.shared.clone(), 92 }), 93 } 94 } 95 } 96 97 impl Unparker { unpark(&self, driver: &driver::Handle)98 pub(crate) fn unpark(&self, driver: &driver::Handle) { 99 self.inner.unpark(driver); 100 } 101 } 102 103 impl Inner { 104 /// Parks the current thread for at most `dur`. park(&self, handle: &driver::Handle)105 fn park(&self, handle: &driver::Handle) { 106 // If we were previously notified then we consume this notification and 107 // return quickly. 108 if self 109 .state 110 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 111 .is_ok() 112 { 113 return; 114 } 115 116 if let Some(mut driver) = self.shared.driver.try_lock() { 117 self.park_driver(&mut driver, handle); 118 } else { 119 self.park_condvar(); 120 } 121 } 122 park_condvar(&self)123 fn park_condvar(&self) { 124 // Otherwise we need to coordinate going to sleep 125 let mut m = self.mutex.lock(); 126 127 match self 128 .state 129 .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst) 130 { 131 Ok(_) => {} 132 Err(NOTIFIED) => { 133 // We must read here, even though we know it will be `NOTIFIED`. 134 // This is because `unpark` may have been called again since we read 135 // `NOTIFIED` in the `compare_exchange` above. We must perform an 136 // acquire operation that synchronizes with that `unpark` to observe 137 // any writes it made before the call to unpark. To do that we must 138 // read from the write it made to `state`. 139 let old = self.state.swap(EMPTY, SeqCst); 140 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); 141 142 return; 143 } 144 Err(actual) => panic!("inconsistent park state; actual = {}", actual), 145 } 146 147 loop { 148 m = self.condvar.wait(m).unwrap(); 149 150 if self 151 .state 152 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 153 .is_ok() 154 { 155 // got a notification 156 return; 157 } 158 159 // spurious wakeup, go back to sleep 160 } 161 } 162 park_driver(&self, driver: &mut Driver, handle: &driver::Handle)163 fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) { 164 match self 165 .state 166 .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) 167 { 168 Ok(_) => {} 169 Err(NOTIFIED) => { 170 // We must read here, even though we know it will be `NOTIFIED`. 171 // This is because `unpark` may have been called again since we read 172 // `NOTIFIED` in the `compare_exchange` above. We must perform an 173 // acquire operation that synchronizes with that `unpark` to observe 174 // any writes it made before the call to unpark. To do that we must 175 // read from the write it made to `state`. 176 let old = self.state.swap(EMPTY, SeqCst); 177 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); 178 179 return; 180 } 181 Err(actual) => panic!("inconsistent park state; actual = {}", actual), 182 } 183 184 driver.park(handle); 185 186 match self.state.swap(EMPTY, SeqCst) { 187 NOTIFIED => {} // got a notification, hurray! 188 PARKED_DRIVER => {} // no notification, alas 189 n => panic!("inconsistent park_timeout state: {}", n), 190 } 191 } 192 unpark(&self, driver: &driver::Handle)193 fn unpark(&self, driver: &driver::Handle) { 194 // To ensure the unparked thread will observe any writes we made before 195 // this call, we must perform a release operation that `park` can 196 // synchronize with. To do that we must write `NOTIFIED` even if `state` 197 // is already `NOTIFIED`. That is why this must be a swap rather than a 198 // compare-and-swap that returns if it reads `NOTIFIED` on failure. 199 match self.state.swap(NOTIFIED, SeqCst) { 200 EMPTY => {} // no one was waiting 201 NOTIFIED => {} // already unparked 202 PARKED_CONDVAR => self.unpark_condvar(), 203 PARKED_DRIVER => driver.unpark(), 204 actual => panic!("inconsistent state in unpark; actual = {}", actual), 205 } 206 } 207 unpark_condvar(&self)208 fn unpark_condvar(&self) { 209 // There is a period between when the parked thread sets `state` to 210 // `PARKED` (or last checked `state` in the case of a spurious wake 211 // up) and when it actually waits on `cvar`. If we were to notify 212 // during this period it would be ignored and then when the parked 213 // thread went to sleep it would never wake up. Fortunately, it has 214 // `lock` locked at this stage so we can acquire `lock` to wait until 215 // it is ready to receive the notification. 216 // 217 // Releasing `lock` before the call to `notify_one` means that when the 218 // parked thread wakes it doesn't get woken only to have to wait for us 219 // to release `lock`. 220 drop(self.mutex.lock()); 221 222 self.condvar.notify_one() 223 } 224 shutdown(&self, handle: &driver::Handle)225 fn shutdown(&self, handle: &driver::Handle) { 226 if let Some(mut driver) = self.shared.driver.try_lock() { 227 driver.shutdown(handle); 228 } 229 230 self.condvar.notify_all(); 231 } 232 } 233