xref: /aosp_15_r20/external/crosvm/cros_async/src/sync/waiter.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::UnsafeCell;
6 use std::future::Future;
7 use std::mem;
8 use std::pin::Pin;
9 use std::ptr::NonNull;
10 use std::sync::atomic::AtomicBool;
11 use std::sync::atomic::AtomicU8;
12 use std::sync::atomic::Ordering;
13 use std::sync::Arc;
14 use std::task::Context;
15 use std::task::Poll;
16 use std::task::Waker;
17 
18 use intrusive_collections::intrusive_adapter;
19 use intrusive_collections::linked_list::LinkedList;
20 use intrusive_collections::linked_list::LinkedListOps;
21 use intrusive_collections::DefaultLinkOps;
22 use intrusive_collections::LinkOps;
23 
24 use super::super::sync::SpinLock;
25 
26 // An atomic version of a LinkedListLink. See https://github.com/Amanieu/intrusive-rs/issues/47 for
27 // more details.
28 #[repr(align(128))]
29 pub struct AtomicLink {
30     prev: UnsafeCell<Option<NonNull<AtomicLink>>>,
31     next: UnsafeCell<Option<NonNull<AtomicLink>>>,
32     linked: AtomicBool,
33 }
34 
35 impl AtomicLink {
new() -> AtomicLink36     fn new() -> AtomicLink {
37         AtomicLink {
38             linked: AtomicBool::new(false),
39             prev: UnsafeCell::new(None),
40             next: UnsafeCell::new(None),
41         }
42     }
43 
is_linked(&self) -> bool44     fn is_linked(&self) -> bool {
45         self.linked.load(Ordering::Relaxed)
46     }
47 }
48 
49 impl DefaultLinkOps for AtomicLink {
50     type Ops = AtomicLinkOps;
51 
52     const NEW: Self::Ops = AtomicLinkOps;
53 }
54 
55 // SAFETY:
56 // Safe because the only way to mutate `AtomicLink` is via the `LinkedListOps` trait whose methods
57 // are all unsafe and require that the caller has first called `acquire_link` (and had it return
58 // true) to use them safely.
59 unsafe impl Send for AtomicLink {}
60 // SAFETY: See safety comment for impl Send
61 unsafe impl Sync for AtomicLink {}
62 
63 #[derive(Copy, Clone, Default)]
64 pub struct AtomicLinkOps;
65 
66 // TODO(b/315998194): Add safety comment
67 #[allow(clippy::undocumented_unsafe_blocks)]
68 unsafe impl LinkOps for AtomicLinkOps {
69     type LinkPtr = NonNull<AtomicLink>;
70 
acquire_link(&mut self, ptr: Self::LinkPtr) -> bool71     unsafe fn acquire_link(&mut self, ptr: Self::LinkPtr) -> bool {
72         !ptr.as_ref().linked.swap(true, Ordering::Acquire)
73     }
74 
release_link(&mut self, ptr: Self::LinkPtr)75     unsafe fn release_link(&mut self, ptr: Self::LinkPtr) {
76         ptr.as_ref().linked.store(false, Ordering::Release)
77     }
78 }
79 
80 // TODO(b/315998194): Add safety comment
81 #[allow(clippy::undocumented_unsafe_blocks)]
82 unsafe impl LinkedListOps for AtomicLinkOps {
next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr>83     unsafe fn next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> {
84         *ptr.as_ref().next.get()
85     }
86 
prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr>87     unsafe fn prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> {
88         *ptr.as_ref().prev.get()
89     }
90 
set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>)91     unsafe fn set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>) {
92         *ptr.as_ref().next.get() = next;
93     }
94 
set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>)95     unsafe fn set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>) {
96         *ptr.as_ref().prev.get() = prev;
97     }
98 }
99 
100 #[derive(Clone, Copy)]
101 pub enum Kind {
102     Shared,
103     Exclusive,
104 }
105 
106 enum State {
107     Init,
108     Waiting(Waker),
109     Woken,
110     Finished,
111     Processing,
112 }
113 
114 // Indicates the queue to which the waiter belongs. It is the responsibility of the Mutex and
115 // Condvar implementations to update this value when adding/removing a Waiter from their respective
116 // waiter lists.
117 #[repr(u8)]
118 #[derive(Debug, Eq, PartialEq)]
119 pub enum WaitingFor {
120     // The waiter is either not linked into  a waiter list or it is linked into a temporary list.
121     None = 0,
122     // The waiter is linked into the Mutex's waiter list.
123     Mutex = 1,
124     // The waiter is linked into the Condvar's waiter list.
125     Condvar = 2,
126 }
127 
128 // Represents a thread currently blocked on a Condvar or on acquiring a Mutex.
129 pub struct Waiter {
130     link: AtomicLink,
131     state: SpinLock<State>,
132     cancel: fn(usize, &Waiter, bool),
133     cancel_data: usize,
134     kind: Kind,
135     waiting_for: AtomicU8,
136 }
137 
138 impl Waiter {
139     // Create a new, initialized Waiter.
140     //
141     // `kind` should indicate whether this waiter represent a thread that is waiting for a shared
142     // lock or an exclusive lock.
143     //
144     // `cancel` is the function that is called when a `WaitFuture` (returned by the `wait()`
145     // function) is dropped before it can complete. `cancel_data` is used as the first parameter of
146     // the `cancel` function. The second parameter is the `Waiter` that was canceled and the third
147     // parameter indicates whether the `WaitFuture` was dropped after it was woken (but before it
148     // was polled to completion). A value of `false` for the third parameter may already be stale
149     // by the time the cancel function runs and so does not guarantee that the waiter was not woken.
150     // In this case, implementations should still check if the Waiter was woken. However, a value of
151     // `true` guarantees that the waiter was already woken up so no additional checks are necessary.
152     // In this case, the cancel implementation should wake up the next waiter in its wait list, if
153     // any.
154     //
155     // `waiting_for` indicates the waiter list to which this `Waiter` will be added. See the
156     // documentation of the `WaitingFor` enum for the meaning of the different values.
new( kind: Kind, cancel: fn(usize, &Waiter, bool), cancel_data: usize, waiting_for: WaitingFor, ) -> Waiter157     pub fn new(
158         kind: Kind,
159         cancel: fn(usize, &Waiter, bool),
160         cancel_data: usize,
161         waiting_for: WaitingFor,
162     ) -> Waiter {
163         Waiter {
164             link: AtomicLink::new(),
165             state: SpinLock::new(State::Init),
166             cancel,
167             cancel_data,
168             kind,
169             waiting_for: AtomicU8::new(waiting_for as u8),
170         }
171     }
172 
173     // The kind of lock that this `Waiter` is waiting to acquire.
kind(&self) -> Kind174     pub fn kind(&self) -> Kind {
175         self.kind
176     }
177 
178     // Returns true if this `Waiter` is currently linked into a waiter list.
is_linked(&self) -> bool179     pub fn is_linked(&self) -> bool {
180         self.link.is_linked()
181     }
182 
183     // Indicates the waiter list to which this `Waiter` belongs.
is_waiting_for(&self) -> WaitingFor184     pub fn is_waiting_for(&self) -> WaitingFor {
185         match self.waiting_for.load(Ordering::Acquire) {
186             0 => WaitingFor::None,
187             1 => WaitingFor::Mutex,
188             2 => WaitingFor::Condvar,
189             v => panic!("Unknown value for `WaitingFor`: {}", v),
190         }
191     }
192 
193     // Change the waiter list to which this `Waiter` belongs. This will panic if called when the
194     // `Waiter` is still linked into a waiter list.
set_waiting_for(&self, waiting_for: WaitingFor)195     pub fn set_waiting_for(&self, waiting_for: WaitingFor) {
196         self.waiting_for.store(waiting_for as u8, Ordering::Release);
197     }
198 
199     // Reset the Waiter back to its initial state. Panics if this `Waiter` is still linked into a
200     // waiter list.
reset(&self, waiting_for: WaitingFor)201     pub fn reset(&self, waiting_for: WaitingFor) {
202         debug_assert!(!self.is_linked(), "Cannot reset `Waiter` while linked");
203         self.set_waiting_for(waiting_for);
204 
205         let mut state = self.state.lock();
206         if let State::Waiting(waker) = mem::replace(&mut *state, State::Init) {
207             mem::drop(state);
208             mem::drop(waker);
209         }
210     }
211 
212     // Wait until woken up by another thread.
wait(&self) -> WaitFuture<'_>213     pub fn wait(&self) -> WaitFuture<'_> {
214         WaitFuture { waiter: self }
215     }
216 
217     // Wake up the thread associated with this `Waiter`. Panics if `waiting_for()` does not return
218     // `WaitingFor::None` or if `is_linked()` returns true.
wake(&self)219     pub fn wake(&self) {
220         debug_assert!(!self.is_linked(), "Cannot wake `Waiter` while linked");
221         debug_assert_eq!(self.is_waiting_for(), WaitingFor::None);
222 
223         let mut state = self.state.lock();
224 
225         if let State::Waiting(waker) = mem::replace(&mut *state, State::Woken) {
226             mem::drop(state);
227             waker.wake();
228         }
229     }
230 }
231 
232 pub struct WaitFuture<'w> {
233     waiter: &'w Waiter,
234 }
235 
236 impl<'w> Future for WaitFuture<'w> {
237     type Output = ();
238 
poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>239     fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
240         let mut state = self.waiter.state.lock();
241 
242         match mem::replace(&mut *state, State::Processing) {
243             State::Init => {
244                 *state = State::Waiting(cx.waker().clone());
245 
246                 Poll::Pending
247             }
248             State::Waiting(old_waker) => {
249                 *state = State::Waiting(cx.waker().clone());
250                 mem::drop(state);
251                 mem::drop(old_waker);
252 
253                 Poll::Pending
254             }
255             State::Woken => {
256                 *state = State::Finished;
257                 Poll::Ready(())
258             }
259             State::Finished => {
260                 panic!("Future polled after returning Poll::Ready");
261             }
262             State::Processing => {
263                 panic!("Unexpected waker state");
264             }
265         }
266     }
267 }
268 
269 impl<'w> Drop for WaitFuture<'w> {
drop(&mut self)270     fn drop(&mut self) {
271         let state = self.waiter.state.lock();
272 
273         match *state {
274             State::Finished => {}
275             State::Processing => panic!("Unexpected waker state"),
276             State::Woken => {
277                 mem::drop(state);
278 
279                 // We were woken but not polled.  Wake up the next waiter.
280                 (self.waiter.cancel)(self.waiter.cancel_data, self.waiter, true);
281             }
282             _ => {
283                 mem::drop(state);
284 
285                 // Not woken.  No need to wake up any waiters.
286                 (self.waiter.cancel)(self.waiter.cancel_data, self.waiter, false);
287             }
288         }
289     }
290 }
291 
292 intrusive_adapter!(pub WaiterAdapter = Arc<Waiter>: Waiter { link: AtomicLink });
293 
294 pub type WaiterList = LinkedList<WaiterAdapter>;
295