1 use std::boxed::Box;
2 use std::cell::UnsafeCell;
3 use std::collections::HashMap;
4 use std::fmt;
5 use std::marker::PhantomData;
6 use std::mem;
7 use std::ops::{Deref, DerefMut};
8 use std::panic::{RefUnwindSafe, UnwindSafe};
9 use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
10 use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
11 use std::thread::{self, ThreadId};
12 use std::vec::Vec;
13
14 use crate::sync::once_lock::OnceLock;
15 use crate::CachePadded;
16
17 /// The number of shards per sharded lock. Must be a power of two.
18 const NUM_SHARDS: usize = 8;
19
20 /// A shard containing a single reader-writer lock.
21 struct Shard {
22 /// The inner reader-writer lock.
23 lock: RwLock<()>,
24
25 /// The write-guard keeping this shard locked.
26 ///
27 /// Write operations will lock each shard and store the guard here. These guards get dropped at
28 /// the same time the big guard is dropped.
29 write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>,
30 }
31
32 /// A sharded reader-writer lock.
33 ///
34 /// This lock is equivalent to [`RwLock`], except read operations are faster and write operations
35 /// are slower.
36 ///
37 /// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a
38 /// single cache line. Read operations will pick one of the shards depending on the current thread
39 /// and lock it. Write operations need to lock all shards in succession.
40 ///
41 /// By splitting the lock into shards, concurrent read operations will in most cases choose
42 /// different shards and thus update different cache lines, which is good for scalability. However,
43 /// write operations need to do more work and are therefore slower than usual.
44 ///
45 /// The priority policy of the lock is dependent on the underlying operating system's
46 /// implementation, and this type does not guarantee that any particular policy will be used.
47 ///
48 /// # Poisoning
49 ///
50 /// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be
51 /// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any
52 /// read operation, the lock will not be poisoned.
53 ///
54 /// # Examples
55 ///
56 /// ```
57 /// use crossbeam_utils::sync::ShardedLock;
58 ///
59 /// let lock = ShardedLock::new(5);
60 ///
61 /// // Any number of read locks can be held at once.
62 /// {
63 /// let r1 = lock.read().unwrap();
64 /// let r2 = lock.read().unwrap();
65 /// assert_eq!(*r1, 5);
66 /// assert_eq!(*r2, 5);
67 /// } // Read locks are dropped at this point.
68 ///
69 /// // However, only one write lock may be held.
70 /// {
71 /// let mut w = lock.write().unwrap();
72 /// *w += 1;
73 /// assert_eq!(*w, 6);
74 /// } // Write lock is dropped here.
75 /// ```
76 ///
77 /// [`RwLock`]: std::sync::RwLock
78 pub struct ShardedLock<T: ?Sized> {
79 /// A list of locks protecting the internal data.
80 shards: Box<[CachePadded<Shard>]>,
81
82 /// The internal data.
83 value: UnsafeCell<T>,
84 }
85
86 unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {}
87 unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {}
88
89 impl<T: ?Sized> UnwindSafe for ShardedLock<T> {}
90 impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {}
91
92 impl<T> ShardedLock<T> {
93 /// Creates a new sharded reader-writer lock.
94 ///
95 /// # Examples
96 ///
97 /// ```
98 /// use crossbeam_utils::sync::ShardedLock;
99 ///
100 /// let lock = ShardedLock::new(5);
101 /// ```
new(value: T) -> ShardedLock<T>102 pub fn new(value: T) -> ShardedLock<T> {
103 ShardedLock {
104 shards: (0..NUM_SHARDS)
105 .map(|_| {
106 CachePadded::new(Shard {
107 lock: RwLock::new(()),
108 write_guard: UnsafeCell::new(None),
109 })
110 })
111 .collect::<Box<[_]>>(),
112 value: UnsafeCell::new(value),
113 }
114 }
115
116 /// Consumes this lock, returning the underlying data.
117 ///
118 /// # Errors
119 ///
120 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
121 /// operation panics.
122 ///
123 /// # Examples
124 ///
125 /// ```
126 /// use crossbeam_utils::sync::ShardedLock;
127 ///
128 /// let lock = ShardedLock::new(String::new());
129 /// {
130 /// let mut s = lock.write().unwrap();
131 /// *s = "modified".to_owned();
132 /// }
133 /// assert_eq!(lock.into_inner().unwrap(), "modified");
134 /// ```
into_inner(self) -> LockResult<T>135 pub fn into_inner(self) -> LockResult<T> {
136 let is_poisoned = self.is_poisoned();
137 let inner = self.value.into_inner();
138
139 if is_poisoned {
140 Err(PoisonError::new(inner))
141 } else {
142 Ok(inner)
143 }
144 }
145 }
146
147 impl<T: ?Sized> ShardedLock<T> {
148 /// Returns `true` if the lock is poisoned.
149 ///
150 /// If another thread can still access the lock, it may become poisoned at any time. A `false`
151 /// result should not be trusted without additional synchronization.
152 ///
153 /// # Examples
154 ///
155 /// ```
156 /// use crossbeam_utils::sync::ShardedLock;
157 /// use std::sync::Arc;
158 /// use std::thread;
159 ///
160 /// let lock = Arc::new(ShardedLock::new(0));
161 /// let c_lock = lock.clone();
162 ///
163 /// let _ = thread::spawn(move || {
164 /// let _lock = c_lock.write().unwrap();
165 /// panic!(); // the lock gets poisoned
166 /// }).join();
167 /// assert_eq!(lock.is_poisoned(), true);
168 /// ```
is_poisoned(&self) -> bool169 pub fn is_poisoned(&self) -> bool {
170 self.shards[0].lock.is_poisoned()
171 }
172
173 /// Returns a mutable reference to the underlying data.
174 ///
175 /// Since this call borrows the lock mutably, no actual locking needs to take place.
176 ///
177 /// # Errors
178 ///
179 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
180 /// operation panics.
181 ///
182 /// # Examples
183 ///
184 /// ```
185 /// use crossbeam_utils::sync::ShardedLock;
186 ///
187 /// let mut lock = ShardedLock::new(0);
188 /// *lock.get_mut().unwrap() = 10;
189 /// assert_eq!(*lock.read().unwrap(), 10);
190 /// ```
get_mut(&mut self) -> LockResult<&mut T>191 pub fn get_mut(&mut self) -> LockResult<&mut T> {
192 let is_poisoned = self.is_poisoned();
193 let inner = unsafe { &mut *self.value.get() };
194
195 if is_poisoned {
196 Err(PoisonError::new(inner))
197 } else {
198 Ok(inner)
199 }
200 }
201
202 /// Attempts to acquire this lock with shared read access.
203 ///
204 /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
205 /// is returned which will release the shared access when it is dropped. This method does not
206 /// provide any guarantees with respect to the ordering of whether contentious readers or
207 /// writers will acquire the lock first.
208 ///
209 /// # Errors
210 ///
211 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
212 /// operation panics.
213 ///
214 /// # Examples
215 ///
216 /// ```
217 /// use crossbeam_utils::sync::ShardedLock;
218 ///
219 /// let lock = ShardedLock::new(1);
220 ///
221 /// match lock.try_read() {
222 /// Ok(n) => assert_eq!(*n, 1),
223 /// Err(_) => unreachable!(),
224 /// };
225 /// ```
try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>>226 pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>> {
227 // Take the current thread index and map it to a shard index. Thread indices will tend to
228 // distribute shards among threads equally, thus reducing contention due to read-locking.
229 let current_index = current_index().unwrap_or(0);
230 let shard_index = current_index & (self.shards.len() - 1);
231
232 match self.shards[shard_index].lock.try_read() {
233 Ok(guard) => Ok(ShardedLockReadGuard {
234 lock: self,
235 _guard: guard,
236 _marker: PhantomData,
237 }),
238 Err(TryLockError::Poisoned(err)) => {
239 let guard = ShardedLockReadGuard {
240 lock: self,
241 _guard: err.into_inner(),
242 _marker: PhantomData,
243 };
244 Err(TryLockError::Poisoned(PoisonError::new(guard)))
245 }
246 Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
247 }
248 }
249
250 /// Locks with shared read access, blocking the current thread until it can be acquired.
251 ///
252 /// The calling thread will be blocked until there are no more writers which hold the lock.
253 /// There may be other readers currently inside the lock when this method returns. This method
254 /// does not provide any guarantees with respect to the ordering of whether contentious readers
255 /// or writers will acquire the lock first.
256 ///
257 /// Returns a guard which will release the shared access when dropped.
258 ///
259 /// # Errors
260 ///
261 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
262 /// operation panics.
263 ///
264 /// # Panics
265 ///
266 /// This method might panic when called if the lock is already held by the current thread.
267 ///
268 /// # Examples
269 ///
270 /// ```
271 /// use crossbeam_utils::sync::ShardedLock;
272 /// use std::sync::Arc;
273 /// use std::thread;
274 ///
275 /// let lock = Arc::new(ShardedLock::new(1));
276 /// let c_lock = lock.clone();
277 ///
278 /// let n = lock.read().unwrap();
279 /// assert_eq!(*n, 1);
280 ///
281 /// thread::spawn(move || {
282 /// let r = c_lock.read();
283 /// assert!(r.is_ok());
284 /// }).join().unwrap();
285 /// ```
read(&self) -> LockResult<ShardedLockReadGuard<'_, T>>286 pub fn read(&self) -> LockResult<ShardedLockReadGuard<'_, T>> {
287 // Take the current thread index and map it to a shard index. Thread indices will tend to
288 // distribute shards among threads equally, thus reducing contention due to read-locking.
289 let current_index = current_index().unwrap_or(0);
290 let shard_index = current_index & (self.shards.len() - 1);
291
292 match self.shards[shard_index].lock.read() {
293 Ok(guard) => Ok(ShardedLockReadGuard {
294 lock: self,
295 _guard: guard,
296 _marker: PhantomData,
297 }),
298 Err(err) => Err(PoisonError::new(ShardedLockReadGuard {
299 lock: self,
300 _guard: err.into_inner(),
301 _marker: PhantomData,
302 })),
303 }
304 }
305
306 /// Attempts to acquire this lock with exclusive write access.
307 ///
308 /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
309 /// is returned which will release the exclusive access when it is dropped. This method does
310 /// not provide any guarantees with respect to the ordering of whether contentious readers or
311 /// writers will acquire the lock first.
312 ///
313 /// # Errors
314 ///
315 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
316 /// operation panics.
317 ///
318 /// # Examples
319 ///
320 /// ```
321 /// use crossbeam_utils::sync::ShardedLock;
322 ///
323 /// let lock = ShardedLock::new(1);
324 ///
325 /// let n = lock.read().unwrap();
326 /// assert_eq!(*n, 1);
327 ///
328 /// assert!(lock.try_write().is_err());
329 /// ```
try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>>330 pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>> {
331 let mut poisoned = false;
332 let mut blocked = None;
333
334 // Write-lock each shard in succession.
335 for (i, shard) in self.shards.iter().enumerate() {
336 let guard = match shard.lock.try_write() {
337 Ok(guard) => guard,
338 Err(TryLockError::Poisoned(err)) => {
339 poisoned = true;
340 err.into_inner()
341 }
342 Err(TryLockError::WouldBlock) => {
343 blocked = Some(i);
344 break;
345 }
346 };
347
348 // Store the guard into the shard.
349 unsafe {
350 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
351 let dest: *mut _ = shard.write_guard.get();
352 *dest = Some(guard);
353 }
354 }
355
356 if let Some(i) = blocked {
357 // Unlock the shards in reverse order of locking.
358 for shard in self.shards[0..i].iter().rev() {
359 unsafe {
360 let dest: *mut _ = shard.write_guard.get();
361 let guard = (*dest).take();
362 drop(guard);
363 }
364 }
365 Err(TryLockError::WouldBlock)
366 } else if poisoned {
367 let guard = ShardedLockWriteGuard {
368 lock: self,
369 _marker: PhantomData,
370 };
371 Err(TryLockError::Poisoned(PoisonError::new(guard)))
372 } else {
373 Ok(ShardedLockWriteGuard {
374 lock: self,
375 _marker: PhantomData,
376 })
377 }
378 }
379
380 /// Locks with exclusive write access, blocking the current thread until it can be acquired.
381 ///
382 /// The calling thread will be blocked until there are no more writers which hold the lock.
383 /// There may be other readers currently inside the lock when this method returns. This method
384 /// does not provide any guarantees with respect to the ordering of whether contentious readers
385 /// or writers will acquire the lock first.
386 ///
387 /// Returns a guard which will release the exclusive access when dropped.
388 ///
389 /// # Errors
390 ///
391 /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
392 /// operation panics.
393 ///
394 /// # Panics
395 ///
396 /// This method might panic when called if the lock is already held by the current thread.
397 ///
398 /// # Examples
399 ///
400 /// ```
401 /// use crossbeam_utils::sync::ShardedLock;
402 ///
403 /// let lock = ShardedLock::new(1);
404 ///
405 /// let mut n = lock.write().unwrap();
406 /// *n = 2;
407 ///
408 /// assert!(lock.try_read().is_err());
409 /// ```
write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>>410 pub fn write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>> {
411 let mut poisoned = false;
412
413 // Write-lock each shard in succession.
414 for shard in self.shards.iter() {
415 let guard = match shard.lock.write() {
416 Ok(guard) => guard,
417 Err(err) => {
418 poisoned = true;
419 err.into_inner()
420 }
421 };
422
423 // Store the guard into the shard.
424 unsafe {
425 let guard: RwLockWriteGuard<'_, ()> = guard;
426 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
427 let dest: *mut _ = shard.write_guard.get();
428 *dest = Some(guard);
429 }
430 }
431
432 if poisoned {
433 Err(PoisonError::new(ShardedLockWriteGuard {
434 lock: self,
435 _marker: PhantomData,
436 }))
437 } else {
438 Ok(ShardedLockWriteGuard {
439 lock: self,
440 _marker: PhantomData,
441 })
442 }
443 }
444 }
445
446 impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result447 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
448 match self.try_read() {
449 Ok(guard) => f
450 .debug_struct("ShardedLock")
451 .field("data", &&*guard)
452 .finish(),
453 Err(TryLockError::Poisoned(err)) => f
454 .debug_struct("ShardedLock")
455 .field("data", &&**err.get_ref())
456 .finish(),
457 Err(TryLockError::WouldBlock) => {
458 struct LockedPlaceholder;
459 impl fmt::Debug for LockedPlaceholder {
460 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461 f.write_str("<locked>")
462 }
463 }
464 f.debug_struct("ShardedLock")
465 .field("data", &LockedPlaceholder)
466 .finish()
467 }
468 }
469 }
470 }
471
472 impl<T: Default> Default for ShardedLock<T> {
default() -> ShardedLock<T>473 fn default() -> ShardedLock<T> {
474 ShardedLock::new(Default::default())
475 }
476 }
477
478 impl<T> From<T> for ShardedLock<T> {
from(t: T) -> Self479 fn from(t: T) -> Self {
480 ShardedLock::new(t)
481 }
482 }
483
484 /// A guard used to release the shared read access of a [`ShardedLock`] when dropped.
485 #[clippy::has_significant_drop]
486 pub struct ShardedLockReadGuard<'a, T: ?Sized> {
487 lock: &'a ShardedLock<T>,
488 _guard: RwLockReadGuard<'a, ()>,
489 _marker: PhantomData<RwLockReadGuard<'a, T>>,
490 }
491
492 unsafe impl<T: ?Sized + Sync> Sync for ShardedLockReadGuard<'_, T> {}
493
494 impl<T: ?Sized> Deref for ShardedLockReadGuard<'_, T> {
495 type Target = T;
496
deref(&self) -> &T497 fn deref(&self) -> &T {
498 unsafe { &*self.lock.value.get() }
499 }
500 }
501
502 impl<T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result503 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
504 f.debug_struct("ShardedLockReadGuard")
505 .field("lock", &self.lock)
506 .finish()
507 }
508 }
509
510 impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result511 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
512 (**self).fmt(f)
513 }
514 }
515
516 /// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped.
517 #[clippy::has_significant_drop]
518 pub struct ShardedLockWriteGuard<'a, T: ?Sized> {
519 lock: &'a ShardedLock<T>,
520 _marker: PhantomData<RwLockWriteGuard<'a, T>>,
521 }
522
523 unsafe impl<T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'_, T> {}
524
525 impl<T: ?Sized> Drop for ShardedLockWriteGuard<'_, T> {
drop(&mut self)526 fn drop(&mut self) {
527 // Unlock the shards in reverse order of locking.
528 for shard in self.lock.shards.iter().rev() {
529 unsafe {
530 let dest: *mut _ = shard.write_guard.get();
531 let guard = (*dest).take();
532 drop(guard);
533 }
534 }
535 }
536 }
537
538 impl<T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result539 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
540 f.debug_struct("ShardedLockWriteGuard")
541 .field("lock", &self.lock)
542 .finish()
543 }
544 }
545
546 impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548 (**self).fmt(f)
549 }
550 }
551
552 impl<T: ?Sized> Deref for ShardedLockWriteGuard<'_, T> {
553 type Target = T;
554
deref(&self) -> &T555 fn deref(&self) -> &T {
556 unsafe { &*self.lock.value.get() }
557 }
558 }
559
560 impl<T: ?Sized> DerefMut for ShardedLockWriteGuard<'_, T> {
deref_mut(&mut self) -> &mut T561 fn deref_mut(&mut self) -> &mut T {
562 unsafe { &mut *self.lock.value.get() }
563 }
564 }
565
566 /// Returns a `usize` that identifies the current thread.
567 ///
568 /// Each thread is associated with an 'index'. While there are no particular guarantees, indices
569 /// usually tend to be consecutive numbers between 0 and the number of running threads.
570 ///
571 /// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
572 /// tearing down.
573 #[inline]
current_index() -> Option<usize>574 fn current_index() -> Option<usize> {
575 REGISTRATION.try_with(|reg| reg.index).ok()
576 }
577
578 /// The global registry keeping track of registered threads and indices.
579 struct ThreadIndices {
580 /// Mapping from `ThreadId` to thread index.
581 mapping: HashMap<ThreadId, usize>,
582
583 /// A list of free indices.
584 free_list: Vec<usize>,
585
586 /// The next index to allocate if the free list is empty.
587 next_index: usize,
588 }
589
thread_indices() -> &'static Mutex<ThreadIndices>590 fn thread_indices() -> &'static Mutex<ThreadIndices> {
591 static THREAD_INDICES: OnceLock<Mutex<ThreadIndices>> = OnceLock::new();
592 fn init() -> Mutex<ThreadIndices> {
593 Mutex::new(ThreadIndices {
594 mapping: HashMap::new(),
595 free_list: Vec::new(),
596 next_index: 0,
597 })
598 }
599 THREAD_INDICES.get_or_init(init)
600 }
601
602 /// A registration of a thread with an index.
603 ///
604 /// When dropped, unregisters the thread and frees the reserved index.
605 struct Registration {
606 index: usize,
607 thread_id: ThreadId,
608 }
609
610 impl Drop for Registration {
drop(&mut self)611 fn drop(&mut self) {
612 let mut indices = thread_indices().lock().unwrap();
613 indices.mapping.remove(&self.thread_id);
614 indices.free_list.push(self.index);
615 }
616 }
617
618 std::thread_local! {
619 static REGISTRATION: Registration = {
620 let thread_id = thread::current().id();
621 let mut indices = thread_indices().lock().unwrap();
622
623 let index = match indices.free_list.pop() {
624 Some(i) => i,
625 None => {
626 let i = indices.next_index;
627 indices.next_index += 1;
628 i
629 }
630 };
631 indices.mapping.insert(thread_id, index);
632
633 Registration {
634 index,
635 thread_id,
636 }
637 };
638 }
639