1 //! Coordinates idling workers
2 
3 #![allow(dead_code)]
4 
5 use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
6 use crate::loom::sync::MutexGuard;
7 use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Handle, Shared};
8 
9 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
10 
11 pub(super) struct Idle {
12     /// Number of searching cores
13     num_searching: AtomicUsize,
14 
15     /// Number of idle cores
16     num_idle: AtomicUsize,
17 
18     /// Map of idle cores
19     idle_map: IdleMap,
20 
21     /// Used to catch false-negatives when waking workers
22     needs_searching: AtomicBool,
23 
24     /// Total number of cores
25     num_cores: usize,
26 }
27 
28 pub(super) struct IdleMap {
29     chunks: Vec<AtomicUsize>,
30 }
31 
32 pub(super) struct Snapshot {
33     chunks: Vec<usize>,
34 }
35 
36 /// Data synchronized by the scheduler mutex
37 pub(super) struct Synced {
38     /// Worker IDs that are currently sleeping
39     sleepers: Vec<usize>,
40 
41     /// Cores available for workers
42     available_cores: Vec<Box<Core>>,
43 }
44 
45 impl Idle {
new(cores: Vec<Box<Core>>, num_workers: usize) -> (Idle, Synced)46     pub(super) fn new(cores: Vec<Box<Core>>, num_workers: usize) -> (Idle, Synced) {
47         let idle = Idle {
48             num_searching: AtomicUsize::new(0),
49             num_idle: AtomicUsize::new(cores.len()),
50             idle_map: IdleMap::new(&cores),
51             needs_searching: AtomicBool::new(false),
52             num_cores: cores.len(),
53         };
54 
55         let synced = Synced {
56             sleepers: Vec::with_capacity(num_workers),
57             available_cores: cores,
58         };
59 
60         (idle, synced)
61     }
62 
needs_searching(&self) -> bool63     pub(super) fn needs_searching(&self) -> bool {
64         self.needs_searching.load(Acquire)
65     }
66 
num_idle(&self, synced: &Synced) -> usize67     pub(super) fn num_idle(&self, synced: &Synced) -> usize {
68         #[cfg(not(loom))]
69         debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire));
70         synced.available_cores.len()
71     }
72 
num_searching(&self) -> usize73     pub(super) fn num_searching(&self) -> usize {
74         self.num_searching.load(Acquire)
75     }
76 
snapshot(&self, snapshot: &mut Snapshot)77     pub(super) fn snapshot(&self, snapshot: &mut Snapshot) {
78         snapshot.update(&self.idle_map)
79     }
80 
81     /// Try to acquire an available core
try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>>82     pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> {
83         let ret = synced.available_cores.pop();
84 
85         if let Some(core) = &ret {
86             // Decrement the number of idle cores
87             let num_idle = self.num_idle.load(Acquire) - 1;
88             debug_assert_eq!(num_idle, synced.available_cores.len());
89             self.num_idle.store(num_idle, Release);
90 
91             self.idle_map.unset(core.index);
92             debug_assert!(self.idle_map.matches(&synced.available_cores));
93         }
94 
95         ret
96     }
97 
98     /// We need at least one searching worker
notify_local(&self, shared: &Shared)99     pub(super) fn notify_local(&self, shared: &Shared) {
100         if self.num_searching.load(Acquire) != 0 {
101             // There already is a searching worker. Note, that this could be a
102             // false positive. However, because this method is called **from** a
103             // worker, we know that there is at least one worker currently
104             // awake, so the scheduler won't deadlock.
105             return;
106         }
107 
108         if self.num_idle.load(Acquire) == 0 {
109             self.needs_searching.store(true, Release);
110             return;
111         }
112 
113         // There aren't any searching workers. Try to initialize one
114         if self
115             .num_searching
116             .compare_exchange(0, 1, AcqRel, Acquire)
117             .is_err()
118         {
119             // Failing the compare_exchange means another thread concurrently
120             // launched a searching worker.
121             return;
122         }
123 
124         super::counters::inc_num_unparks_local();
125 
126         // Acquire the lock
127         let synced = shared.synced.lock();
128         self.notify_synced(synced, shared);
129     }
130 
131     /// Notifies a single worker
notify_remote(&self, synced: MutexGuard<'_, worker::Synced>, shared: &Shared)132     pub(super) fn notify_remote(&self, synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
133         if synced.idle.sleepers.is_empty() {
134             self.needs_searching.store(true, Release);
135             return;
136         }
137 
138         // We need to establish a stronger barrier than with `notify_local`
139         self.num_searching.fetch_add(1, AcqRel);
140 
141         self.notify_synced(synced, shared);
142     }
143 
144     /// Notify a worker while synced
notify_synced(&self, mut synced: MutexGuard<'_, worker::Synced>, shared: &Shared)145     fn notify_synced(&self, mut synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
146         // Find a sleeping worker
147         if let Some(worker) = synced.idle.sleepers.pop() {
148             // Find an available core
149             if let Some(mut core) = self.try_acquire_available_core(&mut synced.idle) {
150                 debug_assert!(!core.is_searching);
151                 core.is_searching = true;
152 
153                 // Assign the core to the worker
154                 synced.assigned_cores[worker] = Some(core);
155 
156                 // Drop the lock before notifying the condvar.
157                 drop(synced);
158 
159                 super::counters::inc_num_unparks_remote();
160 
161                 // Notify the worker
162                 shared.condvars[worker].notify_one();
163                 return;
164             } else {
165                 synced.idle.sleepers.push(worker);
166             }
167         }
168 
169         super::counters::inc_notify_no_core();
170 
171         // Set the `needs_searching` flag, this happens *while* the lock is held.
172         self.needs_searching.store(true, Release);
173         self.num_searching.fetch_sub(1, Release);
174 
175         // Explicit mutex guard drop to show that holding the guard to this
176         // point is significant. `needs_searching` and `num_searching` must be
177         // updated in the critical section.
178         drop(synced);
179     }
180 
notify_mult( &self, synced: &mut worker::Synced, workers: &mut Vec<usize>, num: usize, )181     pub(super) fn notify_mult(
182         &self,
183         synced: &mut worker::Synced,
184         workers: &mut Vec<usize>,
185         num: usize,
186     ) {
187         debug_assert!(workers.is_empty());
188 
189         for _ in 0..num {
190             if let Some(worker) = synced.idle.sleepers.pop() {
191                 // TODO: can this be switched to use next_available_core?
192                 if let Some(core) = synced.idle.available_cores.pop() {
193                     debug_assert!(!core.is_searching);
194 
195                     self.idle_map.unset(core.index);
196 
197                     synced.assigned_cores[worker] = Some(core);
198 
199                     workers.push(worker);
200 
201                     continue;
202                 } else {
203                     synced.idle.sleepers.push(worker);
204                 }
205             }
206 
207             break;
208         }
209 
210         if !workers.is_empty() {
211             debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
212             let num_idle = synced.idle.available_cores.len();
213             self.num_idle.store(num_idle, Release);
214         } else {
215             #[cfg(not(loom))]
216             debug_assert_eq!(
217                 synced.idle.available_cores.len(),
218                 self.num_idle.load(Acquire)
219             );
220             self.needs_searching.store(true, Release);
221         }
222     }
223 
shutdown(&self, synced: &mut worker::Synced, shared: &Shared)224     pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
225         // Wake every sleeping worker and assign a core to it. There may not be
226         // enough sleeping workers for all cores, but other workers will
227         // eventually find the cores and shut them down.
228         while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() {
229             let worker = synced.idle.sleepers.pop().unwrap();
230             let core = self.try_acquire_available_core(&mut synced.idle).unwrap();
231 
232             synced.assigned_cores[worker] = Some(core);
233             shared.condvars[worker].notify_one();
234         }
235 
236         debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
237 
238         // Wake up any other workers
239         while let Some(index) = synced.idle.sleepers.pop() {
240             shared.condvars[index].notify_one();
241         }
242     }
243 
shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared)244     pub(super) fn shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared) {
245         // If there are any remaining cores, shut them down here.
246         //
247         // This code is a bit convoluted to avoid lock-reentry.
248         while let Some(core) = {
249             let mut synced = shared.synced.lock();
250             self.try_acquire_available_core(&mut synced.idle)
251         } {
252             shared.shutdown_core(handle, core);
253         }
254     }
255 
256     /// The worker releases the given core, making it available to other workers
257     /// that are waiting.
release_core(&self, synced: &mut worker::Synced, core: Box<Core>)258     pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box<Core>) {
259         // The core should not be searching at this point
260         debug_assert!(!core.is_searching);
261 
262         // Check that there are no pending tasks in the global queue
263         debug_assert!(synced.inject.is_empty());
264 
265         let num_idle = synced.idle.available_cores.len();
266         #[cfg(not(loom))]
267         debug_assert_eq!(num_idle, self.num_idle.load(Acquire));
268 
269         self.idle_map.set(core.index);
270 
271         // Store the core in the list of available cores
272         synced.idle.available_cores.push(core);
273 
274         debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
275 
276         // Update `num_idle`
277         self.num_idle.store(num_idle + 1, Release);
278     }
279 
transition_worker_to_parked(&self, synced: &mut worker::Synced, index: usize)280     pub(super) fn transition_worker_to_parked(&self, synced: &mut worker::Synced, index: usize) {
281         // Store the worker index in the list of sleepers
282         synced.idle.sleepers.push(index);
283 
284         // The worker's assigned core slot should be empty
285         debug_assert!(synced.assigned_cores[index].is_none());
286     }
287 
try_transition_worker_to_searching(&self, core: &mut Core)288     pub(super) fn try_transition_worker_to_searching(&self, core: &mut Core) {
289         debug_assert!(!core.is_searching);
290 
291         let num_searching = self.num_searching.load(Acquire);
292         let num_idle = self.num_idle.load(Acquire);
293 
294         if 2 * num_searching >= self.num_cores - num_idle {
295             return;
296         }
297 
298         self.transition_worker_to_searching(core);
299     }
300 
301     /// Needs to happen while synchronized in order to avoid races
transition_worker_to_searching_if_needed( &self, _synced: &mut Synced, core: &mut Core, ) -> bool302     pub(super) fn transition_worker_to_searching_if_needed(
303         &self,
304         _synced: &mut Synced,
305         core: &mut Core,
306     ) -> bool {
307         if self.needs_searching.load(Acquire) {
308             // Needs to be called while holding the lock
309             self.transition_worker_to_searching(core);
310             true
311         } else {
312             false
313         }
314     }
315 
transition_worker_to_searching(&self, core: &mut Core)316     pub(super) fn transition_worker_to_searching(&self, core: &mut Core) {
317         core.is_searching = true;
318         self.num_searching.fetch_add(1, AcqRel);
319         self.needs_searching.store(false, Release);
320     }
321 
322     /// A lightweight transition from searching -> running.
323     ///
324     /// Returns `true` if this is the final searching worker. The caller
325     /// **must** notify a new worker.
transition_worker_from_searching(&self) -> bool326     pub(super) fn transition_worker_from_searching(&self) -> bool {
327         let prev = self.num_searching.fetch_sub(1, AcqRel);
328         debug_assert!(prev > 0);
329 
330         prev == 1
331     }
332 }
333 
334 const BITS: usize = usize::BITS as usize;
335 const BIT_MASK: usize = (usize::BITS - 1) as usize;
336 
337 impl IdleMap {
new(cores: &[Box<Core>]) -> IdleMap338     fn new(cores: &[Box<Core>]) -> IdleMap {
339         let ret = IdleMap::new_n(num_chunks(cores.len()));
340         ret.set_all(cores);
341 
342         ret
343     }
344 
new_n(n: usize) -> IdleMap345     fn new_n(n: usize) -> IdleMap {
346         let chunks = (0..n).map(|_| AtomicUsize::new(0)).collect();
347         IdleMap { chunks }
348     }
349 
set(&self, index: usize)350     fn set(&self, index: usize) {
351         let (chunk, mask) = index_to_mask(index);
352         let prev = self.chunks[chunk].load(Acquire);
353         let next = prev | mask;
354         self.chunks[chunk].store(next, Release);
355     }
356 
set_all(&self, cores: &[Box<Core>])357     fn set_all(&self, cores: &[Box<Core>]) {
358         for core in cores {
359             self.set(core.index);
360         }
361     }
362 
unset(&self, index: usize)363     fn unset(&self, index: usize) {
364         let (chunk, mask) = index_to_mask(index);
365         let prev = self.chunks[chunk].load(Acquire);
366         let next = prev & !mask;
367         self.chunks[chunk].store(next, Release);
368     }
369 
matches(&self, idle_cores: &[Box<Core>]) -> bool370     fn matches(&self, idle_cores: &[Box<Core>]) -> bool {
371         let expect = IdleMap::new_n(self.chunks.len());
372         expect.set_all(idle_cores);
373 
374         for (i, chunk) in expect.chunks.iter().enumerate() {
375             if chunk.load(Acquire) != self.chunks[i].load(Acquire) {
376                 return false;
377             }
378         }
379 
380         true
381     }
382 }
383 
384 impl Snapshot {
new(idle: &Idle) -> Snapshot385     pub(crate) fn new(idle: &Idle) -> Snapshot {
386         let chunks = vec![0; idle.idle_map.chunks.len()];
387         let mut ret = Snapshot { chunks };
388         ret.update(&idle.idle_map);
389         ret
390     }
391 
update(&mut self, idle_map: &IdleMap)392     fn update(&mut self, idle_map: &IdleMap) {
393         for i in 0..self.chunks.len() {
394             self.chunks[i] = idle_map.chunks[i].load(Acquire);
395         }
396     }
397 
is_idle(&self, index: usize) -> bool398     pub(super) fn is_idle(&self, index: usize) -> bool {
399         let (chunk, mask) = index_to_mask(index);
400         debug_assert!(
401             chunk < self.chunks.len(),
402             "index={}; chunks={}",
403             index,
404             self.chunks.len()
405         );
406         self.chunks[chunk] & mask == mask
407     }
408 }
409 
num_chunks(max_cores: usize) -> usize410 fn num_chunks(max_cores: usize) -> usize {
411     (max_cores / BITS) + 1
412 }
413 
index_to_mask(index: usize) -> (usize, usize)414 fn index_to_mask(index: usize) -> (usize, usize) {
415     let mask = 1 << (index & BIT_MASK);
416     let chunk = index / BITS;
417 
418     (chunk, mask)
419 }
420 
num_active_workers(synced: &Synced) -> usize421 fn num_active_workers(synced: &Synced) -> usize {
422     synced.available_cores.capacity() - synced.available_cores.len()
423 }
424