1 use std::cell::UnsafeCell; 2 use std::marker::PhantomData; 3 use std::ops::{Deref, DerefMut}; 4 use std::pin::Pin; 5 use std::sync::atomic::{AtomicUsize, Ordering}; 6 use std::sync::{Arc, Mutex as StdMutex}; 7 use std::{fmt, mem}; 8 9 use slab::Slab; 10 11 use futures_core::future::{FusedFuture, Future}; 12 use futures_core::task::{Context, Poll, Waker}; 13 14 /// A futures-aware mutex. 15 /// 16 /// # Fairness 17 /// 18 /// This mutex provides no fairness guarantees. Tasks may not acquire the mutex 19 /// in the order that they requested the lock, and it's possible for a single task 20 /// which repeatedly takes the lock to starve other tasks, which may be left waiting 21 /// indefinitely. 22 pub struct Mutex<T: ?Sized> { 23 state: AtomicUsize, 24 waiters: StdMutex<Slab<Waiter>>, 25 value: UnsafeCell<T>, 26 } 27 28 impl<T: ?Sized> fmt::Debug for Mutex<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 30 let state = self.state.load(Ordering::SeqCst); 31 f.debug_struct("Mutex") 32 .field("is_locked", &((state & IS_LOCKED) != 0)) 33 .field("has_waiters", &((state & HAS_WAITERS) != 0)) 34 .finish() 35 } 36 } 37 38 impl<T> From<T> for Mutex<T> { from(t: T) -> Self39 fn from(t: T) -> Self { 40 Self::new(t) 41 } 42 } 43 44 impl<T: Default> Default for Mutex<T> { default() -> Self45 fn default() -> Self { 46 Self::new(Default::default()) 47 } 48 } 49 50 enum Waiter { 51 Waiting(Waker), 52 Woken, 53 } 54 55 impl Waiter { register(&mut self, waker: &Waker)56 fn register(&mut self, waker: &Waker) { 57 match self { 58 Self::Waiting(w) if waker.will_wake(w) => {} 59 _ => *self = Self::Waiting(waker.clone()), 60 } 61 } 62 wake(&mut self)63 fn wake(&mut self) { 64 match mem::replace(self, Self::Woken) { 65 Self::Waiting(waker) => waker.wake(), 66 Self::Woken => {} 67 } 68 } 69 } 70 71 const IS_LOCKED: usize = 1 << 0; 72 const HAS_WAITERS: usize = 1 << 1; 73 74 impl<T> Mutex<T> { 75 /// Creates a new futures-aware mutex. new(t: T) -> Self76 pub fn new(t: T) -> Self { 77 Self { 78 state: AtomicUsize::new(0), 79 waiters: StdMutex::new(Slab::new()), 80 value: UnsafeCell::new(t), 81 } 82 } 83 84 /// Consumes this mutex, returning the underlying data. 85 /// 86 /// # Examples 87 /// 88 /// ``` 89 /// use futures::lock::Mutex; 90 /// 91 /// let mutex = Mutex::new(0); 92 /// assert_eq!(mutex.into_inner(), 0); 93 /// ``` into_inner(self) -> T94 pub fn into_inner(self) -> T { 95 self.value.into_inner() 96 } 97 } 98 99 impl<T: ?Sized> Mutex<T> { 100 /// Attempt to acquire the lock immediately. 101 /// 102 /// If the lock is currently held, this will return `None`. try_lock(&self) -> Option<MutexGuard<'_, T>>103 pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> { 104 let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire); 105 if (old_state & IS_LOCKED) == 0 { 106 Some(MutexGuard { mutex: self }) 107 } else { 108 None 109 } 110 } 111 112 /// Attempt to acquire the lock immediately. 113 /// 114 /// If the lock is currently held, this will return `None`. try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>>115 pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> { 116 let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire); 117 if (old_state & IS_LOCKED) == 0 { 118 Some(OwnedMutexGuard { mutex: self.clone() }) 119 } else { 120 None 121 } 122 } 123 124 /// Acquire the lock asynchronously. 125 /// 126 /// This method returns a future that will resolve once the lock has been 127 /// successfully acquired. lock(&self) -> MutexLockFuture<'_, T>128 pub fn lock(&self) -> MutexLockFuture<'_, T> { 129 MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } 130 } 131 132 /// Acquire the lock asynchronously. 133 /// 134 /// This method returns a future that will resolve once the lock has been 135 /// successfully acquired. lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T>136 pub fn lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T> { 137 OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } 138 } 139 140 /// Returns a mutable reference to the underlying data. 141 /// 142 /// Since this call borrows the `Mutex` mutably, no actual locking needs to 143 /// take place -- the mutable borrow statically guarantees no locks exist. 144 /// 145 /// # Examples 146 /// 147 /// ``` 148 /// # futures::executor::block_on(async { 149 /// use futures::lock::Mutex; 150 /// 151 /// let mut mutex = Mutex::new(0); 152 /// *mutex.get_mut() = 10; 153 /// assert_eq!(*mutex.lock().await, 10); 154 /// # }); 155 /// ``` get_mut(&mut self) -> &mut T156 pub fn get_mut(&mut self) -> &mut T { 157 // We know statically that there are no other references to `self`, so 158 // there's no need to lock the inner mutex. 159 unsafe { &mut *self.value.get() } 160 } 161 remove_waker(&self, wait_key: usize, wake_another: bool)162 fn remove_waker(&self, wait_key: usize, wake_another: bool) { 163 if wait_key != WAIT_KEY_NONE { 164 let mut waiters = self.waiters.lock().unwrap(); 165 match waiters.remove(wait_key) { 166 Waiter::Waiting(_) => {} 167 Waiter::Woken => { 168 // We were awoken, but then dropped before we could 169 // wake up to acquire the lock. Wake up another 170 // waiter. 171 if wake_another { 172 if let Some((_i, waiter)) = waiters.iter_mut().next() { 173 waiter.wake(); 174 } 175 } 176 } 177 } 178 if waiters.is_empty() { 179 self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock 180 } 181 } 182 } 183 184 // Unlocks the mutex. Called by MutexGuard and MappedMutexGuard when they are 185 // dropped. unlock(&self)186 fn unlock(&self) { 187 let old_state = self.state.fetch_and(!IS_LOCKED, Ordering::AcqRel); 188 if (old_state & HAS_WAITERS) != 0 { 189 let mut waiters = self.waiters.lock().unwrap(); 190 if let Some((_i, waiter)) = waiters.iter_mut().next() { 191 waiter.wake(); 192 } 193 } 194 } 195 } 196 197 // Sentinel for when no slot in the `Slab` has been dedicated to this object. 198 const WAIT_KEY_NONE: usize = usize::MAX; 199 200 /// A future which resolves when the target mutex has been successfully acquired, owned version. 201 pub struct OwnedMutexLockFuture<T: ?Sized> { 202 // `None` indicates that the mutex was successfully acquired. 203 mutex: Option<Arc<Mutex<T>>>, 204 wait_key: usize, 205 } 206 207 impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 209 f.debug_struct("OwnedMutexLockFuture") 210 .field("was_acquired", &self.mutex.is_none()) 211 .field("mutex", &self.mutex) 212 .field( 213 "wait_key", 214 &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }), 215 ) 216 .finish() 217 } 218 } 219 220 impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> { is_terminated(&self) -> bool221 fn is_terminated(&self) -> bool { 222 self.mutex.is_none() 223 } 224 } 225 226 impl<T: ?Sized> Future for OwnedMutexLockFuture<T> { 227 type Output = OwnedMutexGuard<T>; 228 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>229 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 230 let this = self.get_mut(); 231 232 let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion"); 233 234 if let Some(lock) = mutex.try_lock_owned() { 235 mutex.remove_waker(this.wait_key, false); 236 this.mutex = None; 237 return Poll::Ready(lock); 238 } 239 240 { 241 let mut waiters = mutex.waiters.lock().unwrap(); 242 if this.wait_key == WAIT_KEY_NONE { 243 this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone())); 244 if waiters.len() == 1 { 245 mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock 246 } 247 } else { 248 waiters[this.wait_key].register(cx.waker()); 249 } 250 } 251 252 // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by 253 // attempting to acquire the lock again. 254 if let Some(lock) = mutex.try_lock_owned() { 255 mutex.remove_waker(this.wait_key, false); 256 this.mutex = None; 257 return Poll::Ready(lock); 258 } 259 260 Poll::Pending 261 } 262 } 263 264 impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> { drop(&mut self)265 fn drop(&mut self) { 266 if let Some(mutex) = self.mutex.as_ref() { 267 // This future was dropped before it acquired the mutex. 268 // 269 // Remove ourselves from the map, waking up another waiter if we 270 // had been awoken to acquire the lock. 271 mutex.remove_waker(self.wait_key, true); 272 } 273 } 274 } 275 276 /// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods. 277 /// When this structure is dropped (falls out of scope), the lock will be 278 /// unlocked. 279 pub struct OwnedMutexGuard<T: ?Sized> { 280 mutex: Arc<Mutex<T>>, 281 } 282 283 impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result284 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 285 f.debug_struct("OwnedMutexGuard") 286 .field("value", &&**self) 287 .field("mutex", &self.mutex) 288 .finish() 289 } 290 } 291 292 impl<T: ?Sized> Drop for OwnedMutexGuard<T> { drop(&mut self)293 fn drop(&mut self) { 294 self.mutex.unlock() 295 } 296 } 297 298 impl<T: ?Sized> Deref for OwnedMutexGuard<T> { 299 type Target = T; deref(&self) -> &T300 fn deref(&self) -> &T { 301 unsafe { &*self.mutex.value.get() } 302 } 303 } 304 305 impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> { deref_mut(&mut self) -> &mut T306 fn deref_mut(&mut self) -> &mut T { 307 unsafe { &mut *self.mutex.value.get() } 308 } 309 } 310 311 /// A future which resolves when the target mutex has been successfully acquired. 312 pub struct MutexLockFuture<'a, T: ?Sized> { 313 // `None` indicates that the mutex was successfully acquired. 314 mutex: Option<&'a Mutex<T>>, 315 wait_key: usize, 316 } 317 318 impl<T: ?Sized> fmt::Debug for MutexLockFuture<'_, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result319 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 320 f.debug_struct("MutexLockFuture") 321 .field("was_acquired", &self.mutex.is_none()) 322 .field("mutex", &self.mutex) 323 .field( 324 "wait_key", 325 &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }), 326 ) 327 .finish() 328 } 329 } 330 331 impl<T: ?Sized> FusedFuture for MutexLockFuture<'_, T> { is_terminated(&self) -> bool332 fn is_terminated(&self) -> bool { 333 self.mutex.is_none() 334 } 335 } 336 337 impl<'a, T: ?Sized> Future for MutexLockFuture<'a, T> { 338 type Output = MutexGuard<'a, T>; 339 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>340 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 341 let mutex = self.mutex.expect("polled MutexLockFuture after completion"); 342 343 if let Some(lock) = mutex.try_lock() { 344 mutex.remove_waker(self.wait_key, false); 345 self.mutex = None; 346 return Poll::Ready(lock); 347 } 348 349 { 350 let mut waiters = mutex.waiters.lock().unwrap(); 351 if self.wait_key == WAIT_KEY_NONE { 352 self.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone())); 353 if waiters.len() == 1 { 354 mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock 355 } 356 } else { 357 waiters[self.wait_key].register(cx.waker()); 358 } 359 } 360 361 // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by 362 // attempting to acquire the lock again. 363 if let Some(lock) = mutex.try_lock() { 364 mutex.remove_waker(self.wait_key, false); 365 self.mutex = None; 366 return Poll::Ready(lock); 367 } 368 369 Poll::Pending 370 } 371 } 372 373 impl<T: ?Sized> Drop for MutexLockFuture<'_, T> { drop(&mut self)374 fn drop(&mut self) { 375 if let Some(mutex) = self.mutex { 376 // This future was dropped before it acquired the mutex. 377 // 378 // Remove ourselves from the map, waking up another waiter if we 379 // had been awoken to acquire the lock. 380 mutex.remove_waker(self.wait_key, true); 381 } 382 } 383 } 384 385 /// An RAII guard returned by the `lock` and `try_lock` methods. 386 /// When this structure is dropped (falls out of scope), the lock will be 387 /// unlocked. 388 pub struct MutexGuard<'a, T: ?Sized> { 389 mutex: &'a Mutex<T>, 390 } 391 392 impl<'a, T: ?Sized> MutexGuard<'a, T> { 393 /// Returns a locked view over a portion of the locked data. 394 /// 395 /// # Example 396 /// 397 /// ``` 398 /// # futures::executor::block_on(async { 399 /// use futures::lock::{Mutex, MutexGuard}; 400 /// 401 /// let data = Mutex::new(Some("value".to_string())); 402 /// { 403 /// let locked_str = MutexGuard::map(data.lock().await, |opt| opt.as_mut().unwrap()); 404 /// assert_eq!(&*locked_str, "value"); 405 /// } 406 /// # }); 407 /// ``` 408 #[inline] map<U: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, U> where F: FnOnce(&mut T) -> &mut U,409 pub fn map<U: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, U> 410 where 411 F: FnOnce(&mut T) -> &mut U, 412 { 413 let mutex = this.mutex; 414 let value = f(unsafe { &mut *this.mutex.value.get() }); 415 // Don't run the `drop` method for MutexGuard. The ownership of the underlying 416 // locked state is being moved to the returned MappedMutexGuard. 417 mem::forget(this); 418 MappedMutexGuard { mutex, value, _marker: PhantomData } 419 } 420 } 421 422 impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result423 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 424 f.debug_struct("MutexGuard").field("value", &&**self).field("mutex", &self.mutex).finish() 425 } 426 } 427 428 impl<T: ?Sized> Drop for MutexGuard<'_, T> { drop(&mut self)429 fn drop(&mut self) { 430 self.mutex.unlock() 431 } 432 } 433 434 impl<T: ?Sized> Deref for MutexGuard<'_, T> { 435 type Target = T; deref(&self) -> &T436 fn deref(&self) -> &T { 437 unsafe { &*self.mutex.value.get() } 438 } 439 } 440 441 impl<T: ?Sized> DerefMut for MutexGuard<'_, T> { deref_mut(&mut self) -> &mut T442 fn deref_mut(&mut self) -> &mut T { 443 unsafe { &mut *self.mutex.value.get() } 444 } 445 } 446 447 /// An RAII guard returned by the `MutexGuard::map` and `MappedMutexGuard::map` methods. 448 /// When this structure is dropped (falls out of scope), the lock will be unlocked. 449 pub struct MappedMutexGuard<'a, T: ?Sized, U: ?Sized> { 450 mutex: &'a Mutex<T>, 451 value: *mut U, 452 _marker: PhantomData<&'a mut U>, 453 } 454 455 impl<'a, T: ?Sized, U: ?Sized> MappedMutexGuard<'a, T, U> { 456 /// Returns a locked view over a portion of the locked data. 457 /// 458 /// # Example 459 /// 460 /// ``` 461 /// # futures::executor::block_on(async { 462 /// use futures::lock::{MappedMutexGuard, Mutex, MutexGuard}; 463 /// 464 /// let data = Mutex::new(Some("value".to_string())); 465 /// { 466 /// let locked_str = MutexGuard::map(data.lock().await, |opt| opt.as_mut().unwrap()); 467 /// let locked_char = MappedMutexGuard::map(locked_str, |s| s.get_mut(0..1).unwrap()); 468 /// assert_eq!(&*locked_char, "v"); 469 /// } 470 /// # }); 471 /// ``` 472 #[inline] map<V: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, V> where F: FnOnce(&mut U) -> &mut V,473 pub fn map<V: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, V> 474 where 475 F: FnOnce(&mut U) -> &mut V, 476 { 477 let mutex = this.mutex; 478 let value = f(unsafe { &mut *this.value }); 479 // Don't run the `drop` method for MappedMutexGuard. The ownership of the underlying 480 // locked state is being moved to the returned MappedMutexGuard. 481 mem::forget(this); 482 MappedMutexGuard { mutex, value, _marker: PhantomData } 483 } 484 } 485 486 impl<T: ?Sized, U: ?Sized + fmt::Debug> fmt::Debug for MappedMutexGuard<'_, T, U> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result487 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 488 f.debug_struct("MappedMutexGuard") 489 .field("value", &&**self) 490 .field("mutex", &self.mutex) 491 .finish() 492 } 493 } 494 495 impl<T: ?Sized, U: ?Sized> Drop for MappedMutexGuard<'_, T, U> { drop(&mut self)496 fn drop(&mut self) { 497 self.mutex.unlock() 498 } 499 } 500 501 impl<T: ?Sized, U: ?Sized> Deref for MappedMutexGuard<'_, T, U> { 502 type Target = U; deref(&self) -> &U503 fn deref(&self) -> &U { 504 unsafe { &*self.value } 505 } 506 } 507 508 impl<T: ?Sized, U: ?Sized> DerefMut for MappedMutexGuard<'_, T, U> { deref_mut(&mut self) -> &mut U509 fn deref_mut(&mut self) -> &mut U { 510 unsafe { &mut *self.value } 511 } 512 } 513 514 // Mutexes can be moved freely between threads and acquired on any thread so long 515 // as the inner value can be safely sent between threads. 516 unsafe impl<T: ?Sized + Send> Send for Mutex<T> {} 517 unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {} 518 519 // It's safe to switch which thread the acquire is being attempted on so long as 520 // `T` can be accessed on that thread. 521 unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {} 522 523 // doesn't have any interesting `&self` methods (only Debug) 524 unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {} 525 526 // It's safe to switch which thread the acquire is being attempted on so long as 527 // `T` can be accessed on that thread. 528 unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {} 529 530 // doesn't have any interesting `&self` methods (only Debug) 531 unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {} 532 533 // Safe to send since we don't track any thread-specific details-- the inner 534 // lock is essentially spinlock-equivalent (attempt to flip an atomic bool) 535 unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {} 536 unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {} 537 538 unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {} 539 unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {} 540 541 unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {} 542 unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {} 543 544 #[cfg(test)] 545 mod tests { 546 use super::*; 547 use std::format; 548 549 #[test] test_mutex_guard_debug_not_recurse()550 fn test_mutex_guard_debug_not_recurse() { 551 let mutex = Mutex::new(42); 552 let guard = mutex.try_lock().unwrap(); 553 let _ = format!("{:?}", guard); 554 let guard = MutexGuard::map(guard, |n| n); 555 let _ = format!("{:?}", guard); 556 } 557 } 558