1 use crate::sync::batch_semaphore::{Semaphore, TryAcquireError};
2 use crate::sync::mutex::TryLockError;
3 #[cfg(all(tokio_unstable, feature = "tracing"))]
4 use crate::util::trace;
5 use std::cell::UnsafeCell;
6 use std::marker;
7 use std::marker::PhantomData;
8 use std::sync::Arc;
9 
10 pub(crate) mod owned_read_guard;
11 pub(crate) mod owned_write_guard;
12 pub(crate) mod owned_write_guard_mapped;
13 pub(crate) mod read_guard;
14 pub(crate) mod write_guard;
15 pub(crate) mod write_guard_mapped;
16 pub(crate) use owned_read_guard::OwnedRwLockReadGuard;
17 pub(crate) use owned_write_guard::OwnedRwLockWriteGuard;
18 pub(crate) use owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
19 pub(crate) use read_guard::RwLockReadGuard;
20 pub(crate) use write_guard::RwLockWriteGuard;
21 pub(crate) use write_guard_mapped::RwLockMappedWriteGuard;
22 
23 #[cfg(not(loom))]
24 const MAX_READS: u32 = u32::MAX >> 3;
25 
26 #[cfg(loom)]
27 const MAX_READS: u32 = 10;
28 
29 /// An asynchronous reader-writer lock.
30 ///
31 /// This type of lock allows a number of readers or at most one writer at any
32 /// point in time. The write portion of this lock typically allows modification
33 /// of the underlying data (exclusive access) and the read portion of this lock
34 /// typically allows for read-only access (shared access).
35 ///
36 /// In comparison, a [`Mutex`] does not distinguish between readers or writers
37 /// that acquire the lock, therefore causing any tasks waiting for the lock to
38 /// become available to yield. An `RwLock` will allow any number of readers to
39 /// acquire the lock as long as a writer is not holding the lock.
40 ///
41 /// The priority policy of Tokio's read-write lock is _fair_ (or
42 /// [_write-preferring_]), in order to ensure that readers cannot starve
43 /// writers. Fairness is ensured using a first-in, first-out queue for the tasks
44 /// awaiting the lock; if a task that wishes to acquire the write lock is at the
45 /// head of the queue, read locks will not be given out until the write lock has
46 /// been released. This is in contrast to the Rust standard library's
47 /// `std::sync::RwLock`, where the priority policy is dependent on the
48 /// operating system's implementation.
49 ///
50 /// The type parameter `T` represents the data that this lock protects. It is
51 /// required that `T` satisfies [`Send`] to be shared across threads. The RAII guards
52 /// returned from the locking methods implement [`Deref`](trait@std::ops::Deref)
53 /// (and [`DerefMut`](trait@std::ops::DerefMut)
54 /// for the `write` methods) to allow access to the content of the lock.
55 ///
56 /// # Examples
57 ///
58 /// ```
59 /// use tokio::sync::RwLock;
60 ///
61 /// #[tokio::main]
62 /// async fn main() {
63 ///     let lock = RwLock::new(5);
64 ///
65 ///     // many reader locks can be held at once
66 ///     {
67 ///         let r1 = lock.read().await;
68 ///         let r2 = lock.read().await;
69 ///         assert_eq!(*r1, 5);
70 ///         assert_eq!(*r2, 5);
71 ///     } // read locks are dropped at this point
72 ///
73 ///     // only one write lock may be held, however
74 ///     {
75 ///         let mut w = lock.write().await;
76 ///         *w += 1;
77 ///         assert_eq!(*w, 6);
78 ///     } // write lock is dropped here
79 /// }
80 /// ```
81 ///
82 /// [`Mutex`]: struct@super::Mutex
83 /// [`RwLock`]: struct@RwLock
84 /// [`RwLockReadGuard`]: struct@RwLockReadGuard
85 /// [`RwLockWriteGuard`]: struct@RwLockWriteGuard
86 /// [`Send`]: trait@std::marker::Send
87 /// [_write-preferring_]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Priority_policies
88 pub struct RwLock<T: ?Sized> {
89     #[cfg(all(tokio_unstable, feature = "tracing"))]
90     resource_span: tracing::Span,
91 
92     // maximum number of concurrent readers
93     mr: u32,
94 
95     //semaphore to coordinate read and write access to T
96     s: Semaphore,
97 
98     //inner data T
99     c: UnsafeCell<T>,
100 }
101 
102 #[test]
103 #[cfg(not(loom))]
bounds()104 fn bounds() {
105     fn check_send<T: Send>() {}
106     fn check_sync<T: Sync>() {}
107     fn check_unpin<T: Unpin>() {}
108     // This has to take a value, since the async fn's return type is unnameable.
109     fn check_send_sync_val<T: Send + Sync>(_t: T) {}
110 
111     check_send::<RwLock<u32>>();
112     check_sync::<RwLock<u32>>();
113     check_unpin::<RwLock<u32>>();
114 
115     check_send::<RwLockReadGuard<'_, u32>>();
116     check_sync::<RwLockReadGuard<'_, u32>>();
117     check_unpin::<RwLockReadGuard<'_, u32>>();
118 
119     check_send::<OwnedRwLockReadGuard<u32, i32>>();
120     check_sync::<OwnedRwLockReadGuard<u32, i32>>();
121     check_unpin::<OwnedRwLockReadGuard<u32, i32>>();
122 
123     check_send::<RwLockWriteGuard<'_, u32>>();
124     check_sync::<RwLockWriteGuard<'_, u32>>();
125     check_unpin::<RwLockWriteGuard<'_, u32>>();
126 
127     check_send::<RwLockMappedWriteGuard<'_, u32>>();
128     check_sync::<RwLockMappedWriteGuard<'_, u32>>();
129     check_unpin::<RwLockMappedWriteGuard<'_, u32>>();
130 
131     check_send::<OwnedRwLockWriteGuard<u32>>();
132     check_sync::<OwnedRwLockWriteGuard<u32>>();
133     check_unpin::<OwnedRwLockWriteGuard<u32>>();
134 
135     check_send::<OwnedRwLockMappedWriteGuard<u32, i32>>();
136     check_sync::<OwnedRwLockMappedWriteGuard<u32, i32>>();
137     check_unpin::<OwnedRwLockMappedWriteGuard<u32, i32>>();
138 
139     let rwlock = Arc::new(RwLock::new(0));
140     check_send_sync_val(rwlock.read());
141     check_send_sync_val(Arc::clone(&rwlock).read_owned());
142     check_send_sync_val(rwlock.write());
143     check_send_sync_val(Arc::clone(&rwlock).write_owned());
144 }
145 
146 // As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
147 // If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through
148 // RwLock<T>.
149 unsafe impl<T> Send for RwLock<T> where T: ?Sized + Send {}
150 unsafe impl<T> Sync for RwLock<T> where T: ?Sized + Send + Sync {}
151 // NB: These impls need to be explicit since we're storing a raw pointer.
152 // Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
153 // `T` is `Send`.
154 unsafe impl<T> Send for RwLockReadGuard<'_, T> where T: ?Sized + Sync {}
155 unsafe impl<T> Sync for RwLockReadGuard<'_, T> where T: ?Sized + Send + Sync {}
156 // T is required to be `Send` because an OwnedRwLockReadGuard can be used to drop the value held in
157 // the RwLock, unlike RwLockReadGuard.
158 unsafe impl<T, U> Send for OwnedRwLockReadGuard<T, U>
159 where
160     T: ?Sized + Send + Sync,
161     U: ?Sized + Sync,
162 {
163 }
164 unsafe impl<T, U> Sync for OwnedRwLockReadGuard<T, U>
165 where
166     T: ?Sized + Send + Sync,
167     U: ?Sized + Send + Sync,
168 {
169 }
170 unsafe impl<T> Sync for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
171 unsafe impl<T> Sync for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
172 unsafe impl<T> Sync for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
173 unsafe impl<T, U> Sync for OwnedRwLockMappedWriteGuard<T, U>
174 where
175     T: ?Sized + Send + Sync,
176     U: ?Sized + Send + Sync,
177 {
178 }
179 // Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over
180 // `T` is `Send` - but since this is also provides mutable access, we need to
181 // make sure that `T` is `Send` since its value can be sent across thread
182 // boundaries.
183 unsafe impl<T> Send for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
184 unsafe impl<T> Send for OwnedRwLockWriteGuard<T> where T: ?Sized + Send + Sync {}
185 unsafe impl<T> Send for RwLockMappedWriteGuard<'_, T> where T: ?Sized + Send + Sync {}
186 unsafe impl<T, U> Send for OwnedRwLockMappedWriteGuard<T, U>
187 where
188     T: ?Sized + Send + Sync,
189     U: ?Sized + Send + Sync,
190 {
191 }
192 
193 impl<T: ?Sized> RwLock<T> {
194     /// Creates a new instance of an `RwLock<T>` which is unlocked.
195     ///
196     /// # Examples
197     ///
198     /// ```
199     /// use tokio::sync::RwLock;
200     ///
201     /// let lock = RwLock::new(5);
202     /// ```
203     #[track_caller]
new(value: T) -> RwLock<T> where T: Sized,204     pub fn new(value: T) -> RwLock<T>
205     where
206         T: Sized,
207     {
208         #[cfg(all(tokio_unstable, feature = "tracing"))]
209         let resource_span = {
210             let location = std::panic::Location::caller();
211             let resource_span = tracing::trace_span!(
212                 parent: None,
213                 "runtime.resource",
214                 concrete_type = "RwLock",
215                 kind = "Sync",
216                 loc.file = location.file(),
217                 loc.line = location.line(),
218                 loc.col = location.column(),
219             );
220 
221             resource_span.in_scope(|| {
222                 tracing::trace!(
223                     target: "runtime::resource::state_update",
224                     max_readers = MAX_READS,
225                 );
226 
227                 tracing::trace!(
228                     target: "runtime::resource::state_update",
229                     write_locked = false,
230                 );
231 
232                 tracing::trace!(
233                     target: "runtime::resource::state_update",
234                     current_readers = 0,
235                 );
236             });
237 
238             resource_span
239         };
240 
241         #[cfg(all(tokio_unstable, feature = "tracing"))]
242         let s = resource_span.in_scope(|| Semaphore::new(MAX_READS as usize));
243 
244         #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
245         let s = Semaphore::new(MAX_READS as usize);
246 
247         RwLock {
248             mr: MAX_READS,
249             c: UnsafeCell::new(value),
250             s,
251             #[cfg(all(tokio_unstable, feature = "tracing"))]
252             resource_span,
253         }
254     }
255 
256     /// Creates a new instance of an `RwLock<T>` which is unlocked
257     /// and allows a maximum of `max_reads` concurrent readers.
258     ///
259     /// # Examples
260     ///
261     /// ```
262     /// use tokio::sync::RwLock;
263     ///
264     /// let lock = RwLock::with_max_readers(5, 1024);
265     /// ```
266     ///
267     /// # Panics
268     ///
269     /// Panics if `max_reads` is more than `u32::MAX >> 3`.
270     #[track_caller]
with_max_readers(value: T, max_reads: u32) -> RwLock<T> where T: Sized,271     pub fn with_max_readers(value: T, max_reads: u32) -> RwLock<T>
272     where
273         T: Sized,
274     {
275         assert!(
276             max_reads <= MAX_READS,
277             "a RwLock may not be created with more than {MAX_READS} readers"
278         );
279 
280         #[cfg(all(tokio_unstable, feature = "tracing"))]
281         let resource_span = {
282             let location = std::panic::Location::caller();
283 
284             let resource_span = tracing::trace_span!(
285                 parent: None,
286                 "runtime.resource",
287                 concrete_type = "RwLock",
288                 kind = "Sync",
289                 loc.file = location.file(),
290                 loc.line = location.line(),
291                 loc.col = location.column(),
292             );
293 
294             resource_span.in_scope(|| {
295                 tracing::trace!(
296                     target: "runtime::resource::state_update",
297                     max_readers = max_reads,
298                 );
299 
300                 tracing::trace!(
301                     target: "runtime::resource::state_update",
302                     write_locked = false,
303                 );
304 
305                 tracing::trace!(
306                     target: "runtime::resource::state_update",
307                     current_readers = 0,
308                 );
309             });
310 
311             resource_span
312         };
313 
314         #[cfg(all(tokio_unstable, feature = "tracing"))]
315         let s = resource_span.in_scope(|| Semaphore::new(max_reads as usize));
316 
317         #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
318         let s = Semaphore::new(max_reads as usize);
319 
320         RwLock {
321             mr: max_reads,
322             c: UnsafeCell::new(value),
323             s,
324             #[cfg(all(tokio_unstable, feature = "tracing"))]
325             resource_span,
326         }
327     }
328 
329     /// Creates a new instance of an `RwLock<T>` which is unlocked.
330     ///
331     /// When using the `tracing` [unstable feature], a `RwLock` created with
332     /// `const_new` will not be instrumented. As such, it will not be visible
333     /// in [`tokio-console`]. Instead, [`RwLock::new`] should be used to create
334     /// an instrumented object if that is needed.
335     ///
336     /// # Examples
337     ///
338     /// ```
339     /// use tokio::sync::RwLock;
340     ///
341     /// static LOCK: RwLock<i32> = RwLock::const_new(5);
342     /// ```
343     ///
344     /// [`tokio-console`]: https://github.com/tokio-rs/console
345     /// [unstable feature]: crate#unstable-features
346     #[cfg(not(all(loom, test)))]
const_new(value: T) -> RwLock<T> where T: Sized,347     pub const fn const_new(value: T) -> RwLock<T>
348     where
349         T: Sized,
350     {
351         RwLock {
352             mr: MAX_READS,
353             c: UnsafeCell::new(value),
354             s: Semaphore::const_new(MAX_READS as usize),
355             #[cfg(all(tokio_unstable, feature = "tracing"))]
356             resource_span: tracing::Span::none(),
357         }
358     }
359 
360     /// Creates a new instance of an `RwLock<T>` which is unlocked
361     /// and allows a maximum of `max_reads` concurrent readers.
362     ///
363     /// # Examples
364     ///
365     /// ```
366     /// use tokio::sync::RwLock;
367     ///
368     /// static LOCK: RwLock<i32> = RwLock::const_with_max_readers(5, 1024);
369     /// ```
370     #[cfg(not(all(loom, test)))]
const_with_max_readers(value: T, max_reads: u32) -> RwLock<T> where T: Sized,371     pub const fn const_with_max_readers(value: T, max_reads: u32) -> RwLock<T>
372     where
373         T: Sized,
374     {
375         assert!(max_reads <= MAX_READS);
376 
377         RwLock {
378             mr: max_reads,
379             c: UnsafeCell::new(value),
380             s: Semaphore::const_new(max_reads as usize),
381             #[cfg(all(tokio_unstable, feature = "tracing"))]
382             resource_span: tracing::Span::none(),
383         }
384     }
385 
386     /// Locks this `RwLock` with shared read access, causing the current task
387     /// to yield until the lock has been acquired.
388     ///
389     /// The calling task will yield until there are no writers which hold the
390     /// lock. There may be other readers inside the lock when the task resumes.
391     ///
392     /// Note that under the priority policy of [`RwLock`], read locks are not
393     /// granted until prior write locks, to prevent starvation. Therefore
394     /// deadlock may occur if a read lock is held by the current task, a write
395     /// lock attempt is made, and then a subsequent read lock attempt is made
396     /// by the current task.
397     ///
398     /// Returns an RAII guard which will drop this read access of the `RwLock`
399     /// when dropped.
400     ///
401     /// # Cancel safety
402     ///
403     /// This method uses a queue to fairly distribute locks in the order they
404     /// were requested. Cancelling a call to `read` makes you lose your place in
405     /// the queue.
406     ///
407     /// # Examples
408     ///
409     /// ```
410     /// use std::sync::Arc;
411     /// use tokio::sync::RwLock;
412     ///
413     /// #[tokio::main]
414     /// async fn main() {
415     ///     let lock = Arc::new(RwLock::new(1));
416     ///     let c_lock = lock.clone();
417     ///
418     ///     let n = lock.read().await;
419     ///     assert_eq!(*n, 1);
420     ///
421     ///     tokio::spawn(async move {
422     ///         // While main has an active read lock, we acquire one too.
423     ///         let r = c_lock.read().await;
424     ///         assert_eq!(*r, 1);
425     ///     }).await.expect("The spawned task has panicked");
426     ///
427     ///     // Drop the guard after the spawned task finishes.
428     ///     drop(n);
429     /// }
430     /// ```
read(&self) -> RwLockReadGuard<'_, T>431     pub async fn read(&self) -> RwLockReadGuard<'_, T> {
432         let acquire_fut = async {
433             self.s.acquire(1).await.unwrap_or_else(|_| {
434                 // The semaphore was closed. but, we never explicitly close it, and we have a
435                 // handle to it through the Arc, which means that this can never happen.
436                 unreachable!()
437             });
438 
439             RwLockReadGuard {
440                 s: &self.s,
441                 data: self.c.get(),
442                 marker: PhantomData,
443                 #[cfg(all(tokio_unstable, feature = "tracing"))]
444                 resource_span: self.resource_span.clone(),
445             }
446         };
447 
448         #[cfg(all(tokio_unstable, feature = "tracing"))]
449         let acquire_fut = trace::async_op(
450             move || acquire_fut,
451             self.resource_span.clone(),
452             "RwLock::read",
453             "poll",
454             false,
455         );
456 
457         #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
458         let guard = acquire_fut.await;
459 
460         #[cfg(all(tokio_unstable, feature = "tracing"))]
461         self.resource_span.in_scope(|| {
462             tracing::trace!(
463             target: "runtime::resource::state_update",
464             current_readers = 1,
465             current_readers.op = "add",
466             )
467         });
468 
469         guard
470     }
471 
472     /// Blockingly locks this `RwLock` with shared read access.
473     ///
474     /// This method is intended for use cases where you
475     /// need to use this rwlock in asynchronous code as well as in synchronous code.
476     ///
477     /// Returns an RAII guard which will drop the read access of this `RwLock` when dropped.
478     ///
479     /// # Panics
480     ///
481     /// This function panics if called within an asynchronous execution context.
482     ///
483     ///   - If you find yourself in an asynchronous execution context and needing
484     ///     to call some (synchronous) function which performs one of these
485     ///     `blocking_` operations, then consider wrapping that call inside
486     ///     [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
487     ///     (or [`block_in_place()`][crate::task::block_in_place]).
488     ///
489     /// # Examples
490     ///
491     /// ```
492     /// use std::sync::Arc;
493     /// use tokio::sync::RwLock;
494     ///
495     /// #[tokio::main]
496     /// async fn main() {
497     ///     let rwlock = Arc::new(RwLock::new(1));
498     ///     let mut write_lock = rwlock.write().await;
499     ///
500     ///     let blocking_task = tokio::task::spawn_blocking({
501     ///         let rwlock = Arc::clone(&rwlock);
502     ///         move || {
503     ///             // This shall block until the `write_lock` is released.
504     ///             let read_lock = rwlock.blocking_read();
505     ///             assert_eq!(*read_lock, 0);
506     ///         }
507     ///     });
508     ///
509     ///     *write_lock -= 1;
510     ///     drop(write_lock); // release the lock.
511     ///
512     ///     // Await the completion of the blocking task.
513     ///     blocking_task.await.unwrap();
514     ///
515     ///     // Assert uncontended.
516     ///     assert!(rwlock.try_write().is_ok());
517     /// }
518     /// ```
519     #[track_caller]
520     #[cfg(feature = "sync")]
blocking_read(&self) -> RwLockReadGuard<'_, T>521     pub fn blocking_read(&self) -> RwLockReadGuard<'_, T> {
522         crate::future::block_on(self.read())
523     }
524 
525     /// Locks this `RwLock` with shared read access, causing the current task
526     /// to yield until the lock has been acquired.
527     ///
528     /// The calling task will yield until there are no writers which hold the
529     /// lock. There may be other readers inside the lock when the task resumes.
530     ///
531     /// This method is identical to [`RwLock::read`], except that the returned
532     /// guard references the `RwLock` with an [`Arc`] rather than by borrowing
533     /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
534     /// method, and the guard will live for the `'static` lifetime, as it keeps
535     /// the `RwLock` alive by holding an `Arc`.
536     ///
537     /// Note that under the priority policy of [`RwLock`], read locks are not
538     /// granted until prior write locks, to prevent starvation. Therefore
539     /// deadlock may occur if a read lock is held by the current task, a write
540     /// lock attempt is made, and then a subsequent read lock attempt is made
541     /// by the current task.
542     ///
543     /// Returns an RAII guard which will drop this read access of the `RwLock`
544     /// when dropped.
545     ///
546     /// # Cancel safety
547     ///
548     /// This method uses a queue to fairly distribute locks in the order they
549     /// were requested. Cancelling a call to `read_owned` makes you lose your
550     /// place in the queue.
551     ///
552     /// # Examples
553     ///
554     /// ```
555     /// use std::sync::Arc;
556     /// use tokio::sync::RwLock;
557     ///
558     /// #[tokio::main]
559     /// async fn main() {
560     ///     let lock = Arc::new(RwLock::new(1));
561     ///     let c_lock = lock.clone();
562     ///
563     ///     let n = lock.read_owned().await;
564     ///     assert_eq!(*n, 1);
565     ///
566     ///     tokio::spawn(async move {
567     ///         // While main has an active read lock, we acquire one too.
568     ///         let r = c_lock.read_owned().await;
569     ///         assert_eq!(*r, 1);
570     ///     }).await.expect("The spawned task has panicked");
571     ///
572     ///     // Drop the guard after the spawned task finishes.
573     ///     drop(n);
574     ///}
575     /// ```
read_owned(self: Arc<Self>) -> OwnedRwLockReadGuard<T>576     pub async fn read_owned(self: Arc<Self>) -> OwnedRwLockReadGuard<T> {
577         #[cfg(all(tokio_unstable, feature = "tracing"))]
578         let resource_span = self.resource_span.clone();
579 
580         let acquire_fut = async {
581             self.s.acquire(1).await.unwrap_or_else(|_| {
582                 // The semaphore was closed. but, we never explicitly close it, and we have a
583                 // handle to it through the Arc, which means that this can never happen.
584                 unreachable!()
585             });
586 
587             OwnedRwLockReadGuard {
588                 #[cfg(all(tokio_unstable, feature = "tracing"))]
589                 resource_span: self.resource_span.clone(),
590                 data: self.c.get(),
591                 lock: self,
592                 _p: PhantomData,
593             }
594         };
595 
596         #[cfg(all(tokio_unstable, feature = "tracing"))]
597         let acquire_fut = trace::async_op(
598             move || acquire_fut,
599             resource_span,
600             "RwLock::read_owned",
601             "poll",
602             false,
603         );
604 
605         #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
606         let guard = acquire_fut.await;
607 
608         #[cfg(all(tokio_unstable, feature = "tracing"))]
609         guard.resource_span.in_scope(|| {
610             tracing::trace!(
611             target: "runtime::resource::state_update",
612             current_readers = 1,
613             current_readers.op = "add",
614             )
615         });
616 
617         guard
618     }
619 
620     /// Attempts to acquire this `RwLock` with shared read access.
621     ///
622     /// If the access couldn't be acquired immediately, returns [`TryLockError`].
623     /// Otherwise, an RAII guard is returned which will release read access
624     /// when dropped.
625     ///
626     /// [`TryLockError`]: TryLockError
627     ///
628     /// # Examples
629     ///
630     /// ```
631     /// use std::sync::Arc;
632     /// use tokio::sync::RwLock;
633     ///
634     /// #[tokio::main]
635     /// async fn main() {
636     ///     let lock = Arc::new(RwLock::new(1));
637     ///     let c_lock = lock.clone();
638     ///
639     ///     let v = lock.try_read().unwrap();
640     ///     assert_eq!(*v, 1);
641     ///
642     ///     tokio::spawn(async move {
643     ///         // While main has an active read lock, we acquire one too.
644     ///         let n = c_lock.read().await;
645     ///         assert_eq!(*n, 1);
646     ///     }).await.expect("The spawned task has panicked");
647     ///
648     ///     // Drop the guard when spawned task finishes.
649     ///     drop(v);
650     /// }
651     /// ```
try_read(&self) -> Result<RwLockReadGuard<'_, T>, TryLockError>652     pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, TryLockError> {
653         match self.s.try_acquire(1) {
654             Ok(permit) => permit,
655             Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
656             Err(TryAcquireError::Closed) => unreachable!(),
657         }
658 
659         let guard = RwLockReadGuard {
660             s: &self.s,
661             data: self.c.get(),
662             marker: marker::PhantomData,
663             #[cfg(all(tokio_unstable, feature = "tracing"))]
664             resource_span: self.resource_span.clone(),
665         };
666 
667         #[cfg(all(tokio_unstable, feature = "tracing"))]
668         self.resource_span.in_scope(|| {
669             tracing::trace!(
670             target: "runtime::resource::state_update",
671             current_readers = 1,
672             current_readers.op = "add",
673             )
674         });
675 
676         Ok(guard)
677     }
678 
679     /// Attempts to acquire this `RwLock` with shared read access.
680     ///
681     /// If the access couldn't be acquired immediately, returns [`TryLockError`].
682     /// Otherwise, an RAII guard is returned which will release read access
683     /// when dropped.
684     ///
685     /// This method is identical to [`RwLock::try_read`], except that the
686     /// returned guard references the `RwLock` with an [`Arc`] rather than by
687     /// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
688     /// call this method, and the guard will live for the `'static` lifetime,
689     /// as it keeps the `RwLock` alive by holding an `Arc`.
690     ///
691     /// [`TryLockError`]: TryLockError
692     ///
693     /// # Examples
694     ///
695     /// ```
696     /// use std::sync::Arc;
697     /// use tokio::sync::RwLock;
698     ///
699     /// #[tokio::main]
700     /// async fn main() {
701     ///     let lock = Arc::new(RwLock::new(1));
702     ///     let c_lock = lock.clone();
703     ///
704     ///     let v = lock.try_read_owned().unwrap();
705     ///     assert_eq!(*v, 1);
706     ///
707     ///     tokio::spawn(async move {
708     ///         // While main has an active read lock, we acquire one too.
709     ///         let n = c_lock.read_owned().await;
710     ///         assert_eq!(*n, 1);
711     ///     }).await.expect("The spawned task has panicked");
712     ///
713     ///     // Drop the guard when spawned task finishes.
714     ///     drop(v);
715     /// }
716     /// ```
try_read_owned(self: Arc<Self>) -> Result<OwnedRwLockReadGuard<T>, TryLockError>717     pub fn try_read_owned(self: Arc<Self>) -> Result<OwnedRwLockReadGuard<T>, TryLockError> {
718         match self.s.try_acquire(1) {
719             Ok(permit) => permit,
720             Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
721             Err(TryAcquireError::Closed) => unreachable!(),
722         }
723 
724         let guard = OwnedRwLockReadGuard {
725             #[cfg(all(tokio_unstable, feature = "tracing"))]
726             resource_span: self.resource_span.clone(),
727             data: self.c.get(),
728             lock: self,
729             _p: PhantomData,
730         };
731 
732         #[cfg(all(tokio_unstable, feature = "tracing"))]
733         guard.resource_span.in_scope(|| {
734             tracing::trace!(
735             target: "runtime::resource::state_update",
736             current_readers = 1,
737             current_readers.op = "add",
738             )
739         });
740 
741         Ok(guard)
742     }
743 
744     /// Locks this `RwLock` with exclusive write access, causing the current
745     /// task to yield until the lock has been acquired.
746     ///
747     /// The calling task will yield while other writers or readers currently
748     /// have access to the lock.
749     ///
750     /// Returns an RAII guard which will drop the write access of this `RwLock`
751     /// when dropped.
752     ///
753     /// # Cancel safety
754     ///
755     /// This method uses a queue to fairly distribute locks in the order they
756     /// were requested. Cancelling a call to `write` makes you lose your place
757     /// in the queue.
758     ///
759     /// # Examples
760     ///
761     /// ```
762     /// use tokio::sync::RwLock;
763     ///
764     /// #[tokio::main]
765     /// async fn main() {
766     ///   let lock = RwLock::new(1);
767     ///
768     ///   let mut n = lock.write().await;
769     ///   *n = 2;
770     ///}
771     /// ```
write(&self) -> RwLockWriteGuard<'_, T>772     pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
773         let acquire_fut = async {
774             self.s.acquire(self.mr as usize).await.unwrap_or_else(|_| {
775                 // The semaphore was closed. but, we never explicitly close it, and we have a
776                 // handle to it through the Arc, which means that this can never happen.
777                 unreachable!()
778             });
779 
780             RwLockWriteGuard {
781                 permits_acquired: self.mr,
782                 s: &self.s,
783                 data: self.c.get(),
784                 marker: marker::PhantomData,
785                 #[cfg(all(tokio_unstable, feature = "tracing"))]
786                 resource_span: self.resource_span.clone(),
787             }
788         };
789 
790         #[cfg(all(tokio_unstable, feature = "tracing"))]
791         let acquire_fut = trace::async_op(
792             move || acquire_fut,
793             self.resource_span.clone(),
794             "RwLock::write",
795             "poll",
796             false,
797         );
798 
799         #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
800         let guard = acquire_fut.await;
801 
802         #[cfg(all(tokio_unstable, feature = "tracing"))]
803         self.resource_span.in_scope(|| {
804             tracing::trace!(
805             target: "runtime::resource::state_update",
806             write_locked = true,
807             write_locked.op = "override",
808             )
809         });
810 
811         guard
812     }
813 
814     /// Blockingly locks this `RwLock` with exclusive write access.
815     ///
816     /// This method is intended for use cases where you
817     /// need to use this rwlock in asynchronous code as well as in synchronous code.
818     ///
819     /// Returns an RAII guard which will drop the write access of this `RwLock` when dropped.
820     ///
821     /// # Panics
822     ///
823     /// This function panics if called within an asynchronous execution context.
824     ///
825     ///   - If you find yourself in an asynchronous execution context and needing
826     ///     to call some (synchronous) function which performs one of these
827     ///     `blocking_` operations, then consider wrapping that call inside
828     ///     [`spawn_blocking()`][crate::runtime::Handle::spawn_blocking]
829     ///     (or [`block_in_place()`][crate::task::block_in_place]).
830     ///
831     /// # Examples
832     ///
833     /// ```
834     /// use std::sync::Arc;
835     /// use tokio::{sync::RwLock};
836     ///
837     /// #[tokio::main]
838     /// async fn main() {
839     ///     let rwlock =  Arc::new(RwLock::new(1));
840     ///     let read_lock = rwlock.read().await;
841     ///
842     ///     let blocking_task = tokio::task::spawn_blocking({
843     ///         let rwlock = Arc::clone(&rwlock);
844     ///         move || {
845     ///             // This shall block until the `read_lock` is released.
846     ///             let mut write_lock = rwlock.blocking_write();
847     ///             *write_lock = 2;
848     ///         }
849     ///     });
850     ///
851     ///     assert_eq!(*read_lock, 1);
852     ///     // Release the last outstanding read lock.
853     ///     drop(read_lock);
854     ///
855     ///     // Await the completion of the blocking task.
856     ///     blocking_task.await.unwrap();
857     ///
858     ///     // Assert uncontended.
859     ///     let read_lock = rwlock.try_read().unwrap();
860     ///     assert_eq!(*read_lock, 2);
861     /// }
862     /// ```
863     #[track_caller]
864     #[cfg(feature = "sync")]
blocking_write(&self) -> RwLockWriteGuard<'_, T>865     pub fn blocking_write(&self) -> RwLockWriteGuard<'_, T> {
866         crate::future::block_on(self.write())
867     }
868 
869     /// Locks this `RwLock` with exclusive write access, causing the current
870     /// task to yield until the lock has been acquired.
871     ///
872     /// The calling task will yield while other writers or readers currently
873     /// have access to the lock.
874     ///
875     /// This method is identical to [`RwLock::write`], except that the returned
876     /// guard references the `RwLock` with an [`Arc`] rather than by borrowing
877     /// it. Therefore, the `RwLock` must be wrapped in an `Arc` to call this
878     /// method, and the guard will live for the `'static` lifetime, as it keeps
879     /// the `RwLock` alive by holding an `Arc`.
880     ///
881     /// Returns an RAII guard which will drop the write access of this `RwLock`
882     /// when dropped.
883     ///
884     /// # Cancel safety
885     ///
886     /// This method uses a queue to fairly distribute locks in the order they
887     /// were requested. Cancelling a call to `write_owned` makes you lose your
888     /// place in the queue.
889     ///
890     /// # Examples
891     ///
892     /// ```
893     /// use std::sync::Arc;
894     /// use tokio::sync::RwLock;
895     ///
896     /// #[tokio::main]
897     /// async fn main() {
898     ///   let lock = Arc::new(RwLock::new(1));
899     ///
900     ///   let mut n = lock.write_owned().await;
901     ///   *n = 2;
902     ///}
903     /// ```
write_owned(self: Arc<Self>) -> OwnedRwLockWriteGuard<T>904     pub async fn write_owned(self: Arc<Self>) -> OwnedRwLockWriteGuard<T> {
905         #[cfg(all(tokio_unstable, feature = "tracing"))]
906         let resource_span = self.resource_span.clone();
907 
908         let acquire_fut = async {
909             self.s.acquire(self.mr as usize).await.unwrap_or_else(|_| {
910                 // The semaphore was closed. but, we never explicitly close it, and we have a
911                 // handle to it through the Arc, which means that this can never happen.
912                 unreachable!()
913             });
914 
915             OwnedRwLockWriteGuard {
916                 #[cfg(all(tokio_unstable, feature = "tracing"))]
917                 resource_span: self.resource_span.clone(),
918                 permits_acquired: self.mr,
919                 data: self.c.get(),
920                 lock: self,
921                 _p: PhantomData,
922             }
923         };
924 
925         #[cfg(all(tokio_unstable, feature = "tracing"))]
926         let acquire_fut = trace::async_op(
927             move || acquire_fut,
928             resource_span,
929             "RwLock::write_owned",
930             "poll",
931             false,
932         );
933 
934         #[allow(clippy::let_and_return)] // this lint triggers when disabling tracing
935         let guard = acquire_fut.await;
936 
937         #[cfg(all(tokio_unstable, feature = "tracing"))]
938         guard.resource_span.in_scope(|| {
939             tracing::trace!(
940             target: "runtime::resource::state_update",
941             write_locked = true,
942             write_locked.op = "override",
943             )
944         });
945 
946         guard
947     }
948 
949     /// Attempts to acquire this `RwLock` with exclusive write access.
950     ///
951     /// If the access couldn't be acquired immediately, returns [`TryLockError`].
952     /// Otherwise, an RAII guard is returned which will release write access
953     /// when dropped.
954     ///
955     /// [`TryLockError`]: TryLockError
956     ///
957     /// # Examples
958     ///
959     /// ```
960     /// use tokio::sync::RwLock;
961     ///
962     /// #[tokio::main]
963     /// async fn main() {
964     ///     let rw = RwLock::new(1);
965     ///
966     ///     let v = rw.read().await;
967     ///     assert_eq!(*v, 1);
968     ///
969     ///     assert!(rw.try_write().is_err());
970     /// }
971     /// ```
try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryLockError>972     pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryLockError> {
973         match self.s.try_acquire(self.mr as usize) {
974             Ok(permit) => permit,
975             Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
976             Err(TryAcquireError::Closed) => unreachable!(),
977         }
978 
979         let guard = RwLockWriteGuard {
980             permits_acquired: self.mr,
981             s: &self.s,
982             data: self.c.get(),
983             marker: marker::PhantomData,
984             #[cfg(all(tokio_unstable, feature = "tracing"))]
985             resource_span: self.resource_span.clone(),
986         };
987 
988         #[cfg(all(tokio_unstable, feature = "tracing"))]
989         self.resource_span.in_scope(|| {
990             tracing::trace!(
991             target: "runtime::resource::state_update",
992             write_locked = true,
993             write_locked.op = "override",
994             )
995         });
996 
997         Ok(guard)
998     }
999 
1000     /// Attempts to acquire this `RwLock` with exclusive write access.
1001     ///
1002     /// If the access couldn't be acquired immediately, returns [`TryLockError`].
1003     /// Otherwise, an RAII guard is returned which will release write access
1004     /// when dropped.
1005     ///
1006     /// This method is identical to [`RwLock::try_write`], except that the
1007     /// returned guard references the `RwLock` with an [`Arc`] rather than by
1008     /// borrowing it. Therefore, the `RwLock` must be wrapped in an `Arc` to
1009     /// call this method, and the guard will live for the `'static` lifetime,
1010     /// as it keeps the `RwLock` alive by holding an `Arc`.
1011     ///
1012     /// [`TryLockError`]: TryLockError
1013     ///
1014     /// # Examples
1015     ///
1016     /// ```
1017     /// use std::sync::Arc;
1018     /// use tokio::sync::RwLock;
1019     ///
1020     /// #[tokio::main]
1021     /// async fn main() {
1022     ///     let rw = Arc::new(RwLock::new(1));
1023     ///
1024     ///     let v = Arc::clone(&rw).read_owned().await;
1025     ///     assert_eq!(*v, 1);
1026     ///
1027     ///     assert!(rw.try_write_owned().is_err());
1028     /// }
1029     /// ```
try_write_owned(self: Arc<Self>) -> Result<OwnedRwLockWriteGuard<T>, TryLockError>1030     pub fn try_write_owned(self: Arc<Self>) -> Result<OwnedRwLockWriteGuard<T>, TryLockError> {
1031         match self.s.try_acquire(self.mr as usize) {
1032             Ok(permit) => permit,
1033             Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
1034             Err(TryAcquireError::Closed) => unreachable!(),
1035         }
1036 
1037         let guard = OwnedRwLockWriteGuard {
1038             #[cfg(all(tokio_unstable, feature = "tracing"))]
1039             resource_span: self.resource_span.clone(),
1040             permits_acquired: self.mr,
1041             data: self.c.get(),
1042             lock: self,
1043             _p: PhantomData,
1044         };
1045 
1046         #[cfg(all(tokio_unstable, feature = "tracing"))]
1047         guard.resource_span.in_scope(|| {
1048             tracing::trace!(
1049             target: "runtime::resource::state_update",
1050             write_locked = true,
1051             write_locked.op = "override",
1052             )
1053         });
1054 
1055         Ok(guard)
1056     }
1057 
1058     /// Returns a mutable reference to the underlying data.
1059     ///
1060     /// Since this call borrows the `RwLock` mutably, no actual locking needs to
1061     /// take place -- the mutable borrow statically guarantees no locks exist.
1062     ///
1063     /// # Examples
1064     ///
1065     /// ```
1066     /// use tokio::sync::RwLock;
1067     ///
1068     /// fn main() {
1069     ///     let mut lock = RwLock::new(1);
1070     ///
1071     ///     let n = lock.get_mut();
1072     ///     *n = 2;
1073     /// }
1074     /// ```
get_mut(&mut self) -> &mut T1075     pub fn get_mut(&mut self) -> &mut T {
1076         unsafe {
1077             // Safety: This is https://github.com/rust-lang/rust/pull/76936
1078             &mut *self.c.get()
1079         }
1080     }
1081 
1082     /// Consumes the lock, returning the underlying data.
into_inner(self) -> T where T: Sized,1083     pub fn into_inner(self) -> T
1084     where
1085         T: Sized,
1086     {
1087         self.c.into_inner()
1088     }
1089 }
1090 
1091 impl<T> From<T> for RwLock<T> {
from(s: T) -> Self1092     fn from(s: T) -> Self {
1093         Self::new(s)
1094     }
1095 }
1096 
1097 impl<T: ?Sized> Default for RwLock<T>
1098 where
1099     T: Default,
1100 {
default() -> Self1101     fn default() -> Self {
1102         Self::new(T::default())
1103     }
1104 }
1105 
1106 impl<T: ?Sized> std::fmt::Debug for RwLock<T>
1107 where
1108     T: std::fmt::Debug,
1109 {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result1110     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1111         let mut d = f.debug_struct("RwLock");
1112         match self.try_read() {
1113             Ok(inner) => d.field("data", &&*inner),
1114             Err(_) => d.field("data", &format_args!("<locked>")),
1115         };
1116         d.finish()
1117     }
1118 }
1119