1 //! Parks the runtime.
2 //!
3 //! A combination of the various resource driver park handles.
4 
5 use crate::loom::sync::atomic::AtomicUsize;
6 use crate::loom::sync::{Arc, Condvar, Mutex};
7 use crate::runtime::driver::{self, Driver};
8 use crate::util::TryLock;
9 
10 use std::sync::atomic::Ordering::SeqCst;
11 use std::time::Duration;
12 
13 pub(crate) struct Parker {
14     inner: Arc<Inner>,
15 }
16 
17 pub(crate) struct Unparker {
18     inner: Arc<Inner>,
19 }
20 
21 struct Inner {
22     /// Avoids entering the park if possible
23     state: AtomicUsize,
24 
25     /// Used to coordinate access to the driver / condvar
26     mutex: Mutex<()>,
27 
28     /// Condvar to block on if the driver is unavailable.
29     condvar: Condvar,
30 
31     /// Resource (I/O, time, ...) driver
32     shared: Arc<Shared>,
33 }
34 
35 const EMPTY: usize = 0;
36 const PARKED_CONDVAR: usize = 1;
37 const PARKED_DRIVER: usize = 2;
38 const NOTIFIED: usize = 3;
39 
40 /// Shared across multiple Parker handles
41 struct Shared {
42     /// Shared driver. Only one thread at a time can use this
43     driver: TryLock<Driver>,
44 }
45 
46 impl Parker {
new(driver: Driver) -> Parker47     pub(crate) fn new(driver: Driver) -> Parker {
48         Parker {
49             inner: Arc::new(Inner {
50                 state: AtomicUsize::new(EMPTY),
51                 mutex: Mutex::new(()),
52                 condvar: Condvar::new(),
53                 shared: Arc::new(Shared {
54                     driver: TryLock::new(driver),
55                 }),
56             }),
57         }
58     }
59 
unpark(&self) -> Unparker60     pub(crate) fn unpark(&self) -> Unparker {
61         Unparker {
62             inner: self.inner.clone(),
63         }
64     }
65 
park(&mut self, handle: &driver::Handle)66     pub(crate) fn park(&mut self, handle: &driver::Handle) {
67         self.inner.park(handle);
68     }
69 
park_timeout(&mut self, handle: &driver::Handle, duration: Duration)70     pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
71         // Only parking with zero is supported...
72         assert_eq!(duration, Duration::from_millis(0));
73 
74         if let Some(mut driver) = self.inner.shared.driver.try_lock() {
75             driver.park_timeout(handle, duration)
76         }
77     }
78 
shutdown(&mut self, handle: &driver::Handle)79     pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
80         self.inner.shutdown(handle);
81     }
82 }
83 
84 impl Clone for Parker {
clone(&self) -> Parker85     fn clone(&self) -> Parker {
86         Parker {
87             inner: Arc::new(Inner {
88                 state: AtomicUsize::new(EMPTY),
89                 mutex: Mutex::new(()),
90                 condvar: Condvar::new(),
91                 shared: self.inner.shared.clone(),
92             }),
93         }
94     }
95 }
96 
97 impl Unparker {
unpark(&self, driver: &driver::Handle)98     pub(crate) fn unpark(&self, driver: &driver::Handle) {
99         self.inner.unpark(driver);
100     }
101 }
102 
103 impl Inner {
104     /// Parks the current thread for at most `dur`.
park(&self, handle: &driver::Handle)105     fn park(&self, handle: &driver::Handle) {
106         // If we were previously notified then we consume this notification and
107         // return quickly.
108         if self
109             .state
110             .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
111             .is_ok()
112         {
113             return;
114         }
115 
116         if let Some(mut driver) = self.shared.driver.try_lock() {
117             self.park_driver(&mut driver, handle);
118         } else {
119             self.park_condvar();
120         }
121     }
122 
park_condvar(&self)123     fn park_condvar(&self) {
124         // Otherwise we need to coordinate going to sleep
125         let mut m = self.mutex.lock();
126 
127         match self
128             .state
129             .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
130         {
131             Ok(_) => {}
132             Err(NOTIFIED) => {
133                 // We must read here, even though we know it will be `NOTIFIED`.
134                 // This is because `unpark` may have been called again since we read
135                 // `NOTIFIED` in the `compare_exchange` above. We must perform an
136                 // acquire operation that synchronizes with that `unpark` to observe
137                 // any writes it made before the call to unpark. To do that we must
138                 // read from the write it made to `state`.
139                 let old = self.state.swap(EMPTY, SeqCst);
140                 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
141 
142                 return;
143             }
144             Err(actual) => panic!("inconsistent park state; actual = {}", actual),
145         }
146 
147         loop {
148             m = self.condvar.wait(m).unwrap();
149 
150             if self
151                 .state
152                 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
153                 .is_ok()
154             {
155                 // got a notification
156                 return;
157             }
158 
159             // spurious wakeup, go back to sleep
160         }
161     }
162 
park_driver(&self, driver: &mut Driver, handle: &driver::Handle)163     fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
164         match self
165             .state
166             .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
167         {
168             Ok(_) => {}
169             Err(NOTIFIED) => {
170                 // We must read here, even though we know it will be `NOTIFIED`.
171                 // This is because `unpark` may have been called again since we read
172                 // `NOTIFIED` in the `compare_exchange` above. We must perform an
173                 // acquire operation that synchronizes with that `unpark` to observe
174                 // any writes it made before the call to unpark. To do that we must
175                 // read from the write it made to `state`.
176                 let old = self.state.swap(EMPTY, SeqCst);
177                 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
178 
179                 return;
180             }
181             Err(actual) => panic!("inconsistent park state; actual = {}", actual),
182         }
183 
184         driver.park(handle);
185 
186         match self.state.swap(EMPTY, SeqCst) {
187             NOTIFIED => {}      // got a notification, hurray!
188             PARKED_DRIVER => {} // no notification, alas
189             n => panic!("inconsistent park_timeout state: {}", n),
190         }
191     }
192 
unpark(&self, driver: &driver::Handle)193     fn unpark(&self, driver: &driver::Handle) {
194         // To ensure the unparked thread will observe any writes we made before
195         // this call, we must perform a release operation that `park` can
196         // synchronize with. To do that we must write `NOTIFIED` even if `state`
197         // is already `NOTIFIED`. That is why this must be a swap rather than a
198         // compare-and-swap that returns if it reads `NOTIFIED` on failure.
199         match self.state.swap(NOTIFIED, SeqCst) {
200             EMPTY => {}    // no one was waiting
201             NOTIFIED => {} // already unparked
202             PARKED_CONDVAR => self.unpark_condvar(),
203             PARKED_DRIVER => driver.unpark(),
204             actual => panic!("inconsistent state in unpark; actual = {}", actual),
205         }
206     }
207 
unpark_condvar(&self)208     fn unpark_condvar(&self) {
209         // There is a period between when the parked thread sets `state` to
210         // `PARKED` (or last checked `state` in the case of a spurious wake
211         // up) and when it actually waits on `cvar`. If we were to notify
212         // during this period it would be ignored and then when the parked
213         // thread went to sleep it would never wake up. Fortunately, it has
214         // `lock` locked at this stage so we can acquire `lock` to wait until
215         // it is ready to receive the notification.
216         //
217         // Releasing `lock` before the call to `notify_one` means that when the
218         // parked thread wakes it doesn't get woken only to have to wait for us
219         // to release `lock`.
220         drop(self.mutex.lock());
221 
222         self.condvar.notify_one()
223     }
224 
shutdown(&self, handle: &driver::Handle)225     fn shutdown(&self, handle: &driver::Handle) {
226         if let Some(mut driver) = self.shared.driver.try_lock() {
227             driver.shutdown(handle);
228         }
229 
230         self.condvar.notify_all();
231     }
232 }
233