1 //! Coordinates idling workers
2
3 #![allow(dead_code)]
4
5 use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
6 use crate::loom::sync::MutexGuard;
7 use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Handle, Shared};
8
9 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
10
11 pub(super) struct Idle {
12 /// Number of searching cores
13 num_searching: AtomicUsize,
14
15 /// Number of idle cores
16 num_idle: AtomicUsize,
17
18 /// Map of idle cores
19 idle_map: IdleMap,
20
21 /// Used to catch false-negatives when waking workers
22 needs_searching: AtomicBool,
23
24 /// Total number of cores
25 num_cores: usize,
26 }
27
28 pub(super) struct IdleMap {
29 chunks: Vec<AtomicUsize>,
30 }
31
32 pub(super) struct Snapshot {
33 chunks: Vec<usize>,
34 }
35
36 /// Data synchronized by the scheduler mutex
37 pub(super) struct Synced {
38 /// Worker IDs that are currently sleeping
39 sleepers: Vec<usize>,
40
41 /// Cores available for workers
42 available_cores: Vec<Box<Core>>,
43 }
44
45 impl Idle {
new(cores: Vec<Box<Core>>, num_workers: usize) -> (Idle, Synced)46 pub(super) fn new(cores: Vec<Box<Core>>, num_workers: usize) -> (Idle, Synced) {
47 let idle = Idle {
48 num_searching: AtomicUsize::new(0),
49 num_idle: AtomicUsize::new(cores.len()),
50 idle_map: IdleMap::new(&cores),
51 needs_searching: AtomicBool::new(false),
52 num_cores: cores.len(),
53 };
54
55 let synced = Synced {
56 sleepers: Vec::with_capacity(num_workers),
57 available_cores: cores,
58 };
59
60 (idle, synced)
61 }
62
needs_searching(&self) -> bool63 pub(super) fn needs_searching(&self) -> bool {
64 self.needs_searching.load(Acquire)
65 }
66
num_idle(&self, synced: &Synced) -> usize67 pub(super) fn num_idle(&self, synced: &Synced) -> usize {
68 #[cfg(not(loom))]
69 debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire));
70 synced.available_cores.len()
71 }
72
num_searching(&self) -> usize73 pub(super) fn num_searching(&self) -> usize {
74 self.num_searching.load(Acquire)
75 }
76
snapshot(&self, snapshot: &mut Snapshot)77 pub(super) fn snapshot(&self, snapshot: &mut Snapshot) {
78 snapshot.update(&self.idle_map)
79 }
80
81 /// Try to acquire an available core
try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>>82 pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> {
83 let ret = synced.available_cores.pop();
84
85 if let Some(core) = &ret {
86 // Decrement the number of idle cores
87 let num_idle = self.num_idle.load(Acquire) - 1;
88 debug_assert_eq!(num_idle, synced.available_cores.len());
89 self.num_idle.store(num_idle, Release);
90
91 self.idle_map.unset(core.index);
92 debug_assert!(self.idle_map.matches(&synced.available_cores));
93 }
94
95 ret
96 }
97
98 /// We need at least one searching worker
notify_local(&self, shared: &Shared)99 pub(super) fn notify_local(&self, shared: &Shared) {
100 if self.num_searching.load(Acquire) != 0 {
101 // There already is a searching worker. Note, that this could be a
102 // false positive. However, because this method is called **from** a
103 // worker, we know that there is at least one worker currently
104 // awake, so the scheduler won't deadlock.
105 return;
106 }
107
108 if self.num_idle.load(Acquire) == 0 {
109 self.needs_searching.store(true, Release);
110 return;
111 }
112
113 // There aren't any searching workers. Try to initialize one
114 if self
115 .num_searching
116 .compare_exchange(0, 1, AcqRel, Acquire)
117 .is_err()
118 {
119 // Failing the compare_exchange means another thread concurrently
120 // launched a searching worker.
121 return;
122 }
123
124 super::counters::inc_num_unparks_local();
125
126 // Acquire the lock
127 let synced = shared.synced.lock();
128 self.notify_synced(synced, shared);
129 }
130
131 /// Notifies a single worker
notify_remote(&self, synced: MutexGuard<'_, worker::Synced>, shared: &Shared)132 pub(super) fn notify_remote(&self, synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
133 if synced.idle.sleepers.is_empty() {
134 self.needs_searching.store(true, Release);
135 return;
136 }
137
138 // We need to establish a stronger barrier than with `notify_local`
139 self.num_searching.fetch_add(1, AcqRel);
140
141 self.notify_synced(synced, shared);
142 }
143
144 /// Notify a worker while synced
notify_synced(&self, mut synced: MutexGuard<'_, worker::Synced>, shared: &Shared)145 fn notify_synced(&self, mut synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
146 // Find a sleeping worker
147 if let Some(worker) = synced.idle.sleepers.pop() {
148 // Find an available core
149 if let Some(mut core) = self.try_acquire_available_core(&mut synced.idle) {
150 debug_assert!(!core.is_searching);
151 core.is_searching = true;
152
153 // Assign the core to the worker
154 synced.assigned_cores[worker] = Some(core);
155
156 // Drop the lock before notifying the condvar.
157 drop(synced);
158
159 super::counters::inc_num_unparks_remote();
160
161 // Notify the worker
162 shared.condvars[worker].notify_one();
163 return;
164 } else {
165 synced.idle.sleepers.push(worker);
166 }
167 }
168
169 super::counters::inc_notify_no_core();
170
171 // Set the `needs_searching` flag, this happens *while* the lock is held.
172 self.needs_searching.store(true, Release);
173 self.num_searching.fetch_sub(1, Release);
174
175 // Explicit mutex guard drop to show that holding the guard to this
176 // point is significant. `needs_searching` and `num_searching` must be
177 // updated in the critical section.
178 drop(synced);
179 }
180
notify_mult( &self, synced: &mut worker::Synced, workers: &mut Vec<usize>, num: usize, )181 pub(super) fn notify_mult(
182 &self,
183 synced: &mut worker::Synced,
184 workers: &mut Vec<usize>,
185 num: usize,
186 ) {
187 debug_assert!(workers.is_empty());
188
189 for _ in 0..num {
190 if let Some(worker) = synced.idle.sleepers.pop() {
191 // TODO: can this be switched to use next_available_core?
192 if let Some(core) = synced.idle.available_cores.pop() {
193 debug_assert!(!core.is_searching);
194
195 self.idle_map.unset(core.index);
196
197 synced.assigned_cores[worker] = Some(core);
198
199 workers.push(worker);
200
201 continue;
202 } else {
203 synced.idle.sleepers.push(worker);
204 }
205 }
206
207 break;
208 }
209
210 if !workers.is_empty() {
211 debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
212 let num_idle = synced.idle.available_cores.len();
213 self.num_idle.store(num_idle, Release);
214 } else {
215 #[cfg(not(loom))]
216 debug_assert_eq!(
217 synced.idle.available_cores.len(),
218 self.num_idle.load(Acquire)
219 );
220 self.needs_searching.store(true, Release);
221 }
222 }
223
shutdown(&self, synced: &mut worker::Synced, shared: &Shared)224 pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
225 // Wake every sleeping worker and assign a core to it. There may not be
226 // enough sleeping workers for all cores, but other workers will
227 // eventually find the cores and shut them down.
228 while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() {
229 let worker = synced.idle.sleepers.pop().unwrap();
230 let core = self.try_acquire_available_core(&mut synced.idle).unwrap();
231
232 synced.assigned_cores[worker] = Some(core);
233 shared.condvars[worker].notify_one();
234 }
235
236 debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
237
238 // Wake up any other workers
239 while let Some(index) = synced.idle.sleepers.pop() {
240 shared.condvars[index].notify_one();
241 }
242 }
243
shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared)244 pub(super) fn shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared) {
245 // If there are any remaining cores, shut them down here.
246 //
247 // This code is a bit convoluted to avoid lock-reentry.
248 while let Some(core) = {
249 let mut synced = shared.synced.lock();
250 self.try_acquire_available_core(&mut synced.idle)
251 } {
252 shared.shutdown_core(handle, core);
253 }
254 }
255
256 /// The worker releases the given core, making it available to other workers
257 /// that are waiting.
release_core(&self, synced: &mut worker::Synced, core: Box<Core>)258 pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box<Core>) {
259 // The core should not be searching at this point
260 debug_assert!(!core.is_searching);
261
262 // Check that there are no pending tasks in the global queue
263 debug_assert!(synced.inject.is_empty());
264
265 let num_idle = synced.idle.available_cores.len();
266 #[cfg(not(loom))]
267 debug_assert_eq!(num_idle, self.num_idle.load(Acquire));
268
269 self.idle_map.set(core.index);
270
271 // Store the core in the list of available cores
272 synced.idle.available_cores.push(core);
273
274 debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
275
276 // Update `num_idle`
277 self.num_idle.store(num_idle + 1, Release);
278 }
279
transition_worker_to_parked(&self, synced: &mut worker::Synced, index: usize)280 pub(super) fn transition_worker_to_parked(&self, synced: &mut worker::Synced, index: usize) {
281 // Store the worker index in the list of sleepers
282 synced.idle.sleepers.push(index);
283
284 // The worker's assigned core slot should be empty
285 debug_assert!(synced.assigned_cores[index].is_none());
286 }
287
try_transition_worker_to_searching(&self, core: &mut Core)288 pub(super) fn try_transition_worker_to_searching(&self, core: &mut Core) {
289 debug_assert!(!core.is_searching);
290
291 let num_searching = self.num_searching.load(Acquire);
292 let num_idle = self.num_idle.load(Acquire);
293
294 if 2 * num_searching >= self.num_cores - num_idle {
295 return;
296 }
297
298 self.transition_worker_to_searching(core);
299 }
300
301 /// Needs to happen while synchronized in order to avoid races
transition_worker_to_searching_if_needed( &self, _synced: &mut Synced, core: &mut Core, ) -> bool302 pub(super) fn transition_worker_to_searching_if_needed(
303 &self,
304 _synced: &mut Synced,
305 core: &mut Core,
306 ) -> bool {
307 if self.needs_searching.load(Acquire) {
308 // Needs to be called while holding the lock
309 self.transition_worker_to_searching(core);
310 true
311 } else {
312 false
313 }
314 }
315
transition_worker_to_searching(&self, core: &mut Core)316 pub(super) fn transition_worker_to_searching(&self, core: &mut Core) {
317 core.is_searching = true;
318 self.num_searching.fetch_add(1, AcqRel);
319 self.needs_searching.store(false, Release);
320 }
321
322 /// A lightweight transition from searching -> running.
323 ///
324 /// Returns `true` if this is the final searching worker. The caller
325 /// **must** notify a new worker.
transition_worker_from_searching(&self) -> bool326 pub(super) fn transition_worker_from_searching(&self) -> bool {
327 let prev = self.num_searching.fetch_sub(1, AcqRel);
328 debug_assert!(prev > 0);
329
330 prev == 1
331 }
332 }
333
334 const BITS: usize = usize::BITS as usize;
335 const BIT_MASK: usize = (usize::BITS - 1) as usize;
336
337 impl IdleMap {
new(cores: &[Box<Core>]) -> IdleMap338 fn new(cores: &[Box<Core>]) -> IdleMap {
339 let ret = IdleMap::new_n(num_chunks(cores.len()));
340 ret.set_all(cores);
341
342 ret
343 }
344
new_n(n: usize) -> IdleMap345 fn new_n(n: usize) -> IdleMap {
346 let chunks = (0..n).map(|_| AtomicUsize::new(0)).collect();
347 IdleMap { chunks }
348 }
349
set(&self, index: usize)350 fn set(&self, index: usize) {
351 let (chunk, mask) = index_to_mask(index);
352 let prev = self.chunks[chunk].load(Acquire);
353 let next = prev | mask;
354 self.chunks[chunk].store(next, Release);
355 }
356
set_all(&self, cores: &[Box<Core>])357 fn set_all(&self, cores: &[Box<Core>]) {
358 for core in cores {
359 self.set(core.index);
360 }
361 }
362
unset(&self, index: usize)363 fn unset(&self, index: usize) {
364 let (chunk, mask) = index_to_mask(index);
365 let prev = self.chunks[chunk].load(Acquire);
366 let next = prev & !mask;
367 self.chunks[chunk].store(next, Release);
368 }
369
matches(&self, idle_cores: &[Box<Core>]) -> bool370 fn matches(&self, idle_cores: &[Box<Core>]) -> bool {
371 let expect = IdleMap::new_n(self.chunks.len());
372 expect.set_all(idle_cores);
373
374 for (i, chunk) in expect.chunks.iter().enumerate() {
375 if chunk.load(Acquire) != self.chunks[i].load(Acquire) {
376 return false;
377 }
378 }
379
380 true
381 }
382 }
383
384 impl Snapshot {
new(idle: &Idle) -> Snapshot385 pub(crate) fn new(idle: &Idle) -> Snapshot {
386 let chunks = vec![0; idle.idle_map.chunks.len()];
387 let mut ret = Snapshot { chunks };
388 ret.update(&idle.idle_map);
389 ret
390 }
391
update(&mut self, idle_map: &IdleMap)392 fn update(&mut self, idle_map: &IdleMap) {
393 for i in 0..self.chunks.len() {
394 self.chunks[i] = idle_map.chunks[i].load(Acquire);
395 }
396 }
397
is_idle(&self, index: usize) -> bool398 pub(super) fn is_idle(&self, index: usize) -> bool {
399 let (chunk, mask) = index_to_mask(index);
400 debug_assert!(
401 chunk < self.chunks.len(),
402 "index={}; chunks={}",
403 index,
404 self.chunks.len()
405 );
406 self.chunks[chunk] & mask == mask
407 }
408 }
409
num_chunks(max_cores: usize) -> usize410 fn num_chunks(max_cores: usize) -> usize {
411 (max_cores / BITS) + 1
412 }
413
index_to_mask(index: usize) -> (usize, usize)414 fn index_to_mask(index: usize) -> (usize, usize) {
415 let mask = 1 << (index & BIT_MASK);
416 let chunk = index / BITS;
417
418 (chunk, mask)
419 }
420
num_active_workers(synced: &Synced) -> usize421 fn num_active_workers(synced: &Synced) -> usize {
422 synced.available_cores.capacity() - synced.available_cores.len()
423 }
424