1 use std::boxed::Box;
2 use std::cell::UnsafeCell;
3 use std::collections::HashMap;
4 use std::fmt;
5 use std::marker::PhantomData;
6 use std::mem;
7 use std::ops::{Deref, DerefMut};
8 use std::panic::{RefUnwindSafe, UnwindSafe};
9 use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
10 use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
11 use std::thread::{self, ThreadId};
12 use std::vec::Vec;
13 
14 use crate::sync::once_lock::OnceLock;
15 use crate::CachePadded;
16 
17 /// The number of shards per sharded lock. Must be a power of two.
18 const NUM_SHARDS: usize = 8;
19 
20 /// A shard containing a single reader-writer lock.
21 struct Shard {
22     /// The inner reader-writer lock.
23     lock: RwLock<()>,
24 
25     /// The write-guard keeping this shard locked.
26     ///
27     /// Write operations will lock each shard and store the guard here. These guards get dropped at
28     /// the same time the big guard is dropped.
29     write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>,
30 }
31 
32 /// A sharded reader-writer lock.
33 ///
34 /// This lock is equivalent to [`RwLock`], except read operations are faster and write operations
35 /// are slower.
36 ///
37 /// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a
38 /// single cache line. Read operations will pick one of the shards depending on the current thread
39 /// and lock it. Write operations need to lock all shards in succession.
40 ///
41 /// By splitting the lock into shards, concurrent read operations will in most cases choose
42 /// different shards and thus update different cache lines, which is good for scalability. However,
43 /// write operations need to do more work and are therefore slower than usual.
44 ///
45 /// The priority policy of the lock is dependent on the underlying operating system's
46 /// implementation, and this type does not guarantee that any particular policy will be used.
47 ///
48 /// # Poisoning
49 ///
50 /// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be
51 /// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any
52 /// read operation, the lock will not be poisoned.
53 ///
54 /// # Examples
55 ///
56 /// ```
57 /// use crossbeam_utils::sync::ShardedLock;
58 ///
59 /// let lock = ShardedLock::new(5);
60 ///
61 /// // Any number of read locks can be held at once.
62 /// {
63 ///     let r1 = lock.read().unwrap();
64 ///     let r2 = lock.read().unwrap();
65 ///     assert_eq!(*r1, 5);
66 ///     assert_eq!(*r2, 5);
67 /// } // Read locks are dropped at this point.
68 ///
69 /// // However, only one write lock may be held.
70 /// {
71 ///     let mut w = lock.write().unwrap();
72 ///     *w += 1;
73 ///     assert_eq!(*w, 6);
74 /// } // Write lock is dropped here.
75 /// ```
76 ///
77 /// [`RwLock`]: std::sync::RwLock
78 pub struct ShardedLock<T: ?Sized> {
79     /// A list of locks protecting the internal data.
80     shards: Box<[CachePadded<Shard>]>,
81 
82     /// The internal data.
83     value: UnsafeCell<T>,
84 }
85 
86 unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {}
87 unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {}
88 
89 impl<T: ?Sized> UnwindSafe for ShardedLock<T> {}
90 impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {}
91 
92 impl<T> ShardedLock<T> {
93     /// Creates a new sharded reader-writer lock.
94     ///
95     /// # Examples
96     ///
97     /// ```
98     /// use crossbeam_utils::sync::ShardedLock;
99     ///
100     /// let lock = ShardedLock::new(5);
101     /// ```
new(value: T) -> ShardedLock<T>102     pub fn new(value: T) -> ShardedLock<T> {
103         ShardedLock {
104             shards: (0..NUM_SHARDS)
105                 .map(|_| {
106                     CachePadded::new(Shard {
107                         lock: RwLock::new(()),
108                         write_guard: UnsafeCell::new(None),
109                     })
110                 })
111                 .collect::<Box<[_]>>(),
112             value: UnsafeCell::new(value),
113         }
114     }
115 
116     /// Consumes this lock, returning the underlying data.
117     ///
118     /// # Errors
119     ///
120     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
121     /// operation panics.
122     ///
123     /// # Examples
124     ///
125     /// ```
126     /// use crossbeam_utils::sync::ShardedLock;
127     ///
128     /// let lock = ShardedLock::new(String::new());
129     /// {
130     ///     let mut s = lock.write().unwrap();
131     ///     *s = "modified".to_owned();
132     /// }
133     /// assert_eq!(lock.into_inner().unwrap(), "modified");
134     /// ```
into_inner(self) -> LockResult<T>135     pub fn into_inner(self) -> LockResult<T> {
136         let is_poisoned = self.is_poisoned();
137         let inner = self.value.into_inner();
138 
139         if is_poisoned {
140             Err(PoisonError::new(inner))
141         } else {
142             Ok(inner)
143         }
144     }
145 }
146 
147 impl<T: ?Sized> ShardedLock<T> {
148     /// Returns `true` if the lock is poisoned.
149     ///
150     /// If another thread can still access the lock, it may become poisoned at any time. A `false`
151     /// result should not be trusted without additional synchronization.
152     ///
153     /// # Examples
154     ///
155     /// ```
156     /// use crossbeam_utils::sync::ShardedLock;
157     /// use std::sync::Arc;
158     /// use std::thread;
159     ///
160     /// let lock = Arc::new(ShardedLock::new(0));
161     /// let c_lock = lock.clone();
162     ///
163     /// let _ = thread::spawn(move || {
164     ///     let _lock = c_lock.write().unwrap();
165     ///     panic!(); // the lock gets poisoned
166     /// }).join();
167     /// assert_eq!(lock.is_poisoned(), true);
168     /// ```
is_poisoned(&self) -> bool169     pub fn is_poisoned(&self) -> bool {
170         self.shards[0].lock.is_poisoned()
171     }
172 
173     /// Returns a mutable reference to the underlying data.
174     ///
175     /// Since this call borrows the lock mutably, no actual locking needs to take place.
176     ///
177     /// # Errors
178     ///
179     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
180     /// operation panics.
181     ///
182     /// # Examples
183     ///
184     /// ```
185     /// use crossbeam_utils::sync::ShardedLock;
186     ///
187     /// let mut lock = ShardedLock::new(0);
188     /// *lock.get_mut().unwrap() = 10;
189     /// assert_eq!(*lock.read().unwrap(), 10);
190     /// ```
get_mut(&mut self) -> LockResult<&mut T>191     pub fn get_mut(&mut self) -> LockResult<&mut T> {
192         let is_poisoned = self.is_poisoned();
193         let inner = unsafe { &mut *self.value.get() };
194 
195         if is_poisoned {
196             Err(PoisonError::new(inner))
197         } else {
198             Ok(inner)
199         }
200     }
201 
202     /// Attempts to acquire this lock with shared read access.
203     ///
204     /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
205     /// is returned which will release the shared access when it is dropped. This method does not
206     /// provide any guarantees with respect to the ordering of whether contentious readers or
207     /// writers will acquire the lock first.
208     ///
209     /// # Errors
210     ///
211     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
212     /// operation panics.
213     ///
214     /// # Examples
215     ///
216     /// ```
217     /// use crossbeam_utils::sync::ShardedLock;
218     ///
219     /// let lock = ShardedLock::new(1);
220     ///
221     /// match lock.try_read() {
222     ///     Ok(n) => assert_eq!(*n, 1),
223     ///     Err(_) => unreachable!(),
224     /// };
225     /// ```
try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>>226     pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>> {
227         // Take the current thread index and map it to a shard index. Thread indices will tend to
228         // distribute shards among threads equally, thus reducing contention due to read-locking.
229         let current_index = current_index().unwrap_or(0);
230         let shard_index = current_index & (self.shards.len() - 1);
231 
232         match self.shards[shard_index].lock.try_read() {
233             Ok(guard) => Ok(ShardedLockReadGuard {
234                 lock: self,
235                 _guard: guard,
236                 _marker: PhantomData,
237             }),
238             Err(TryLockError::Poisoned(err)) => {
239                 let guard = ShardedLockReadGuard {
240                     lock: self,
241                     _guard: err.into_inner(),
242                     _marker: PhantomData,
243                 };
244                 Err(TryLockError::Poisoned(PoisonError::new(guard)))
245             }
246             Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
247         }
248     }
249 
250     /// Locks with shared read access, blocking the current thread until it can be acquired.
251     ///
252     /// The calling thread will be blocked until there are no more writers which hold the lock.
253     /// There may be other readers currently inside the lock when this method returns. This method
254     /// does not provide any guarantees with respect to the ordering of whether contentious readers
255     /// or writers will acquire the lock first.
256     ///
257     /// Returns a guard which will release the shared access when dropped.
258     ///
259     /// # Errors
260     ///
261     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
262     /// operation panics.
263     ///
264     /// # Panics
265     ///
266     /// This method might panic when called if the lock is already held by the current thread.
267     ///
268     /// # Examples
269     ///
270     /// ```
271     /// use crossbeam_utils::sync::ShardedLock;
272     /// use std::sync::Arc;
273     /// use std::thread;
274     ///
275     /// let lock = Arc::new(ShardedLock::new(1));
276     /// let c_lock = lock.clone();
277     ///
278     /// let n = lock.read().unwrap();
279     /// assert_eq!(*n, 1);
280     ///
281     /// thread::spawn(move || {
282     ///     let r = c_lock.read();
283     ///     assert!(r.is_ok());
284     /// }).join().unwrap();
285     /// ```
read(&self) -> LockResult<ShardedLockReadGuard<'_, T>>286     pub fn read(&self) -> LockResult<ShardedLockReadGuard<'_, T>> {
287         // Take the current thread index and map it to a shard index. Thread indices will tend to
288         // distribute shards among threads equally, thus reducing contention due to read-locking.
289         let current_index = current_index().unwrap_or(0);
290         let shard_index = current_index & (self.shards.len() - 1);
291 
292         match self.shards[shard_index].lock.read() {
293             Ok(guard) => Ok(ShardedLockReadGuard {
294                 lock: self,
295                 _guard: guard,
296                 _marker: PhantomData,
297             }),
298             Err(err) => Err(PoisonError::new(ShardedLockReadGuard {
299                 lock: self,
300                 _guard: err.into_inner(),
301                 _marker: PhantomData,
302             })),
303         }
304     }
305 
306     /// Attempts to acquire this lock with exclusive write access.
307     ///
308     /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
309     /// is returned which will release the exclusive access when it is dropped. This method does
310     /// not provide any guarantees with respect to the ordering of whether contentious readers or
311     /// writers will acquire the lock first.
312     ///
313     /// # Errors
314     ///
315     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
316     /// operation panics.
317     ///
318     /// # Examples
319     ///
320     /// ```
321     /// use crossbeam_utils::sync::ShardedLock;
322     ///
323     /// let lock = ShardedLock::new(1);
324     ///
325     /// let n = lock.read().unwrap();
326     /// assert_eq!(*n, 1);
327     ///
328     /// assert!(lock.try_write().is_err());
329     /// ```
try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>>330     pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>> {
331         let mut poisoned = false;
332         let mut blocked = None;
333 
334         // Write-lock each shard in succession.
335         for (i, shard) in self.shards.iter().enumerate() {
336             let guard = match shard.lock.try_write() {
337                 Ok(guard) => guard,
338                 Err(TryLockError::Poisoned(err)) => {
339                     poisoned = true;
340                     err.into_inner()
341                 }
342                 Err(TryLockError::WouldBlock) => {
343                     blocked = Some(i);
344                     break;
345                 }
346             };
347 
348             // Store the guard into the shard.
349             unsafe {
350                 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
351                 let dest: *mut _ = shard.write_guard.get();
352                 *dest = Some(guard);
353             }
354         }
355 
356         if let Some(i) = blocked {
357             // Unlock the shards in reverse order of locking.
358             for shard in self.shards[0..i].iter().rev() {
359                 unsafe {
360                     let dest: *mut _ = shard.write_guard.get();
361                     let guard = (*dest).take();
362                     drop(guard);
363                 }
364             }
365             Err(TryLockError::WouldBlock)
366         } else if poisoned {
367             let guard = ShardedLockWriteGuard {
368                 lock: self,
369                 _marker: PhantomData,
370             };
371             Err(TryLockError::Poisoned(PoisonError::new(guard)))
372         } else {
373             Ok(ShardedLockWriteGuard {
374                 lock: self,
375                 _marker: PhantomData,
376             })
377         }
378     }
379 
380     /// Locks with exclusive write access, blocking the current thread until it can be acquired.
381     ///
382     /// The calling thread will be blocked until there are no more writers which hold the lock.
383     /// There may be other readers currently inside the lock when this method returns. This method
384     /// does not provide any guarantees with respect to the ordering of whether contentious readers
385     /// or writers will acquire the lock first.
386     ///
387     /// Returns a guard which will release the exclusive access when dropped.
388     ///
389     /// # Errors
390     ///
391     /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
392     /// operation panics.
393     ///
394     /// # Panics
395     ///
396     /// This method might panic when called if the lock is already held by the current thread.
397     ///
398     /// # Examples
399     ///
400     /// ```
401     /// use crossbeam_utils::sync::ShardedLock;
402     ///
403     /// let lock = ShardedLock::new(1);
404     ///
405     /// let mut n = lock.write().unwrap();
406     /// *n = 2;
407     ///
408     /// assert!(lock.try_read().is_err());
409     /// ```
write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>>410     pub fn write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>> {
411         let mut poisoned = false;
412 
413         // Write-lock each shard in succession.
414         for shard in self.shards.iter() {
415             let guard = match shard.lock.write() {
416                 Ok(guard) => guard,
417                 Err(err) => {
418                     poisoned = true;
419                     err.into_inner()
420                 }
421             };
422 
423             // Store the guard into the shard.
424             unsafe {
425                 let guard: RwLockWriteGuard<'_, ()> = guard;
426                 let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
427                 let dest: *mut _ = shard.write_guard.get();
428                 *dest = Some(guard);
429             }
430         }
431 
432         if poisoned {
433             Err(PoisonError::new(ShardedLockWriteGuard {
434                 lock: self,
435                 _marker: PhantomData,
436             }))
437         } else {
438             Ok(ShardedLockWriteGuard {
439                 lock: self,
440                 _marker: PhantomData,
441             })
442         }
443     }
444 }
445 
446 impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result447     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
448         match self.try_read() {
449             Ok(guard) => f
450                 .debug_struct("ShardedLock")
451                 .field("data", &&*guard)
452                 .finish(),
453             Err(TryLockError::Poisoned(err)) => f
454                 .debug_struct("ShardedLock")
455                 .field("data", &&**err.get_ref())
456                 .finish(),
457             Err(TryLockError::WouldBlock) => {
458                 struct LockedPlaceholder;
459                 impl fmt::Debug for LockedPlaceholder {
460                     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461                         f.write_str("<locked>")
462                     }
463                 }
464                 f.debug_struct("ShardedLock")
465                     .field("data", &LockedPlaceholder)
466                     .finish()
467             }
468         }
469     }
470 }
471 
472 impl<T: Default> Default for ShardedLock<T> {
default() -> ShardedLock<T>473     fn default() -> ShardedLock<T> {
474         ShardedLock::new(Default::default())
475     }
476 }
477 
478 impl<T> From<T> for ShardedLock<T> {
from(t: T) -> Self479     fn from(t: T) -> Self {
480         ShardedLock::new(t)
481     }
482 }
483 
484 /// A guard used to release the shared read access of a [`ShardedLock`] when dropped.
485 #[clippy::has_significant_drop]
486 pub struct ShardedLockReadGuard<'a, T: ?Sized> {
487     lock: &'a ShardedLock<T>,
488     _guard: RwLockReadGuard<'a, ()>,
489     _marker: PhantomData<RwLockReadGuard<'a, T>>,
490 }
491 
492 unsafe impl<T: ?Sized + Sync> Sync for ShardedLockReadGuard<'_, T> {}
493 
494 impl<T: ?Sized> Deref for ShardedLockReadGuard<'_, T> {
495     type Target = T;
496 
deref(&self) -> &T497     fn deref(&self) -> &T {
498         unsafe { &*self.lock.value.get() }
499     }
500 }
501 
502 impl<T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result503     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
504         f.debug_struct("ShardedLockReadGuard")
505             .field("lock", &self.lock)
506             .finish()
507     }
508 }
509 
510 impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result511     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
512         (**self).fmt(f)
513     }
514 }
515 
516 /// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped.
517 #[clippy::has_significant_drop]
518 pub struct ShardedLockWriteGuard<'a, T: ?Sized> {
519     lock: &'a ShardedLock<T>,
520     _marker: PhantomData<RwLockWriteGuard<'a, T>>,
521 }
522 
523 unsafe impl<T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'_, T> {}
524 
525 impl<T: ?Sized> Drop for ShardedLockWriteGuard<'_, T> {
drop(&mut self)526     fn drop(&mut self) {
527         // Unlock the shards in reverse order of locking.
528         for shard in self.lock.shards.iter().rev() {
529             unsafe {
530                 let dest: *mut _ = shard.write_guard.get();
531                 let guard = (*dest).take();
532                 drop(guard);
533             }
534         }
535     }
536 }
537 
538 impl<T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result539     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
540         f.debug_struct("ShardedLockWriteGuard")
541             .field("lock", &self.lock)
542             .finish()
543     }
544 }
545 
546 impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result547     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548         (**self).fmt(f)
549     }
550 }
551 
552 impl<T: ?Sized> Deref for ShardedLockWriteGuard<'_, T> {
553     type Target = T;
554 
deref(&self) -> &T555     fn deref(&self) -> &T {
556         unsafe { &*self.lock.value.get() }
557     }
558 }
559 
560 impl<T: ?Sized> DerefMut for ShardedLockWriteGuard<'_, T> {
deref_mut(&mut self) -> &mut T561     fn deref_mut(&mut self) -> &mut T {
562         unsafe { &mut *self.lock.value.get() }
563     }
564 }
565 
566 /// Returns a `usize` that identifies the current thread.
567 ///
568 /// Each thread is associated with an 'index'. While there are no particular guarantees, indices
569 /// usually tend to be consecutive numbers between 0 and the number of running threads.
570 ///
571 /// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
572 /// tearing down.
573 #[inline]
current_index() -> Option<usize>574 fn current_index() -> Option<usize> {
575     REGISTRATION.try_with(|reg| reg.index).ok()
576 }
577 
578 /// The global registry keeping track of registered threads and indices.
579 struct ThreadIndices {
580     /// Mapping from `ThreadId` to thread index.
581     mapping: HashMap<ThreadId, usize>,
582 
583     /// A list of free indices.
584     free_list: Vec<usize>,
585 
586     /// The next index to allocate if the free list is empty.
587     next_index: usize,
588 }
589 
thread_indices() -> &'static Mutex<ThreadIndices>590 fn thread_indices() -> &'static Mutex<ThreadIndices> {
591     static THREAD_INDICES: OnceLock<Mutex<ThreadIndices>> = OnceLock::new();
592     fn init() -> Mutex<ThreadIndices> {
593         Mutex::new(ThreadIndices {
594             mapping: HashMap::new(),
595             free_list: Vec::new(),
596             next_index: 0,
597         })
598     }
599     THREAD_INDICES.get_or_init(init)
600 }
601 
602 /// A registration of a thread with an index.
603 ///
604 /// When dropped, unregisters the thread and frees the reserved index.
605 struct Registration {
606     index: usize,
607     thread_id: ThreadId,
608 }
609 
610 impl Drop for Registration {
drop(&mut self)611     fn drop(&mut self) {
612         let mut indices = thread_indices().lock().unwrap();
613         indices.mapping.remove(&self.thread_id);
614         indices.free_list.push(self.index);
615     }
616 }
617 
618 std::thread_local! {
619     static REGISTRATION: Registration = {
620         let thread_id = thread::current().id();
621         let mut indices = thread_indices().lock().unwrap();
622 
623         let index = match indices.free_list.pop() {
624             Some(i) => i,
625             None => {
626                 let i = indices.next_index;
627                 indices.next_index += 1;
628                 i
629             }
630         };
631         indices.mapping.insert(thread_id, index);
632 
633         Registration {
634             index,
635             thread_id,
636         }
637     };
638 }
639