1 //! Futures-powered synchronization primitives.
2 
3 use alloc::boxed::Box;
4 use alloc::sync::Arc;
5 use core::cell::UnsafeCell;
6 use core::ops::{Deref, DerefMut};
7 use core::pin::Pin;
8 use core::sync::atomic::AtomicPtr;
9 use core::sync::atomic::Ordering::SeqCst;
10 use core::{fmt, ptr};
11 #[cfg(feature = "bilock")]
12 use futures_core::future::Future;
13 use futures_core::task::{Context, Poll, Waker};
14 
15 /// A type of futures-powered synchronization primitive which is a mutex between
16 /// two possible owners.
17 ///
18 /// This primitive is not as generic as a full-blown mutex but is sufficient for
19 /// many use cases where there are only two possible owners of a resource. The
20 /// implementation of `BiLock` can be more optimized for just the two possible
21 /// owners.
22 ///
23 /// Note that it's possible to use this lock through a poll-style interface with
24 /// the `poll_lock` method but you can also use it as a future with the `lock`
25 /// method that consumes a `BiLock` and returns a future that will resolve when
26 /// it's locked.
27 ///
28 /// A `BiLock` is typically used for "split" operations where data which serves
29 /// two purposes wants to be split into two to be worked with separately. For
30 /// example a TCP stream could be both a reader and a writer or a framing layer
31 /// could be both a stream and a sink for messages. A `BiLock` enables splitting
32 /// these two and then using each independently in a futures-powered fashion.
33 ///
34 /// This type is only available when the `bilock` feature of this
35 /// library is activated.
36 #[derive(Debug)]
37 #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
38 pub struct BiLock<T> {
39     arc: Arc<Inner<T>>,
40 }
41 
42 #[derive(Debug)]
43 struct Inner<T> {
44     state: AtomicPtr<Waker>,
45     value: Option<UnsafeCell<T>>,
46 }
47 
48 unsafe impl<T: Send> Send for Inner<T> {}
49 unsafe impl<T: Send> Sync for Inner<T> {}
50 
51 impl<T> BiLock<T> {
52     /// Creates a new `BiLock` protecting the provided data.
53     ///
54     /// Two handles to the lock are returned, and these are the only two handles
55     /// that will ever be available to the lock. These can then be sent to separate
56     /// tasks to be managed there.
57     ///
58     /// The data behind the bilock is considered to be pinned, which allows `Pin`
59     /// references to locked data. However, this means that the locked value
60     /// will only be available through `Pin<&mut T>` (not `&mut T`) unless `T` is `Unpin`.
61     /// Similarly, reuniting the lock and extracting the inner value is only
62     /// possible when `T` is `Unpin`.
new(t: T) -> (Self, Self)63     pub fn new(t: T) -> (Self, Self) {
64         let arc = Arc::new(Inner {
65             state: AtomicPtr::new(ptr::null_mut()),
66             value: Some(UnsafeCell::new(t)),
67         });
68 
69         (Self { arc: arc.clone() }, Self { arc })
70     }
71 
72     /// Attempt to acquire this lock, returning `Pending` if it can't be
73     /// acquired.
74     ///
75     /// This function will acquire the lock in a nonblocking fashion, returning
76     /// immediately if the lock is already held. If the lock is successfully
77     /// acquired then `Poll::Ready` is returned with a value that represents
78     /// the locked value (and can be used to access the protected data). The
79     /// lock is unlocked when the returned `BiLockGuard` is dropped.
80     ///
81     /// If the lock is already held then this function will return
82     /// `Poll::Pending`. In this case the current task will also be scheduled
83     /// to receive a notification when the lock would otherwise become
84     /// available.
85     ///
86     /// # Panics
87     ///
88     /// This function will panic if called outside the context of a future's
89     /// task.
poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>>90     pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> {
91         let mut waker = None;
92         loop {
93             let n = self.arc.state.swap(invalid_ptr(1), SeqCst);
94             match n as usize {
95                 // Woohoo, we grabbed the lock!
96                 0 => return Poll::Ready(BiLockGuard { bilock: self }),
97 
98                 // Oops, someone else has locked the lock
99                 1 => {}
100 
101                 // A task was previously blocked on this lock, likely our task,
102                 // so we need to update that task.
103                 _ => unsafe {
104                     let mut prev = Box::from_raw(n);
105                     *prev = cx.waker().clone();
106                     waker = Some(prev);
107                 },
108             }
109 
110             // type ascription for safety's sake!
111             let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone()));
112             let me = Box::into_raw(me);
113 
114             match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) {
115                 // The lock is still locked, but we've now parked ourselves, so
116                 // just report that we're scheduled to receive a notification.
117                 Ok(_) => return Poll::Pending,
118 
119                 // Oops, looks like the lock was unlocked after our swap above
120                 // and before the compare_exchange. Deallocate what we just
121                 // allocated and go through the loop again.
122                 Err(n) if n.is_null() => unsafe {
123                     waker = Some(Box::from_raw(me));
124                 },
125 
126                 // The top of this loop set the previous state to 1, so if we
127                 // failed the CAS above then it's because the previous value was
128                 // *not* zero or one. This indicates that a task was blocked,
129                 // but we're trying to acquire the lock and there's only one
130                 // other reference of the lock, so it should be impossible for
131                 // that task to ever block itself.
132                 Err(n) => panic!("invalid state: {}", n as usize),
133             }
134         }
135     }
136 
137     /// Perform a "blocking lock" of this lock, consuming this lock handle and
138     /// returning a future to the acquired lock.
139     ///
140     /// This function consumes the `BiLock<T>` and returns a sentinel future,
141     /// `BiLockAcquire<T>`. The returned future will resolve to
142     /// `BiLockAcquired<T>` which represents a locked lock similarly to
143     /// `BiLockGuard<T>`.
144     ///
145     /// Note that the returned future will never resolve to an error.
146     #[cfg(feature = "bilock")]
147     #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
lock(&self) -> BiLockAcquire<'_, T>148     pub fn lock(&self) -> BiLockAcquire<'_, T> {
149         BiLockAcquire { bilock: self }
150     }
151 
152     /// Returns `true` only if the other `BiLock<T>` originated from the same call to `BiLock::new`.
is_pair_of(&self, other: &Self) -> bool153     pub fn is_pair_of(&self, other: &Self) -> bool {
154         Arc::ptr_eq(&self.arc, &other.arc)
155     }
156 
157     /// Attempts to put the two "halves" of a `BiLock<T>` back together and
158     /// recover the original value. Succeeds only if the two `BiLock<T>`s
159     /// originated from the same call to `BiLock::new`.
reunite(self, other: Self) -> Result<T, ReuniteError<T>> where T: Unpin,160     pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>>
161     where
162         T: Unpin,
163     {
164         if self.is_pair_of(&other) {
165             drop(other);
166             let inner = Arc::try_unwrap(self.arc)
167                 .ok()
168                 .expect("futures: try_unwrap failed in BiLock<T>::reunite");
169             Ok(unsafe { inner.into_value() })
170         } else {
171             Err(ReuniteError(self, other))
172         }
173     }
174 
unlock(&self)175     fn unlock(&self) {
176         let n = self.arc.state.swap(ptr::null_mut(), SeqCst);
177         match n as usize {
178             // we've locked the lock, shouldn't be possible for us to see an
179             // unlocked lock.
180             0 => panic!("invalid unlocked state"),
181 
182             // Ok, no one else tried to get the lock, we're done.
183             1 => {}
184 
185             // Another task has parked themselves on this lock, let's wake them
186             // up as its now their turn.
187             _ => unsafe {
188                 Box::from_raw(n).wake();
189             },
190         }
191     }
192 }
193 
194 impl<T: Unpin> Inner<T> {
into_value(mut self) -> T195     unsafe fn into_value(mut self) -> T {
196         self.value.take().unwrap().into_inner()
197     }
198 }
199 
200 impl<T> Drop for Inner<T> {
drop(&mut self)201     fn drop(&mut self) {
202         assert!(self.state.load(SeqCst).is_null());
203     }
204 }
205 
206 /// Error indicating two `BiLock<T>`s were not two halves of a whole, and
207 /// thus could not be `reunite`d.
208 #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
209 pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>);
210 
211 impl<T> fmt::Debug for ReuniteError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result212     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213         f.debug_tuple("ReuniteError").field(&"...").finish()
214     }
215 }
216 
217 impl<T> fmt::Display for ReuniteError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result218     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219         write!(f, "tried to reunite two BiLocks that don't form a pair")
220     }
221 }
222 
223 #[cfg(feature = "std")]
224 impl<T: core::any::Any> std::error::Error for ReuniteError<T> {}
225 
226 /// Returned RAII guard from the `poll_lock` method.
227 ///
228 /// This structure acts as a sentinel to the data in the `BiLock<T>` itself,
229 /// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be
230 /// unlocked.
231 #[derive(Debug)]
232 #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
233 pub struct BiLockGuard<'a, T> {
234     bilock: &'a BiLock<T>,
235 }
236 
237 // We allow parallel access to T via Deref, so Sync bound is also needed here.
238 unsafe impl<T: Send + Sync> Sync for BiLockGuard<'_, T> {}
239 
240 impl<T> Deref for BiLockGuard<'_, T> {
241     type Target = T;
deref(&self) -> &T242     fn deref(&self) -> &T {
243         unsafe { &*self.bilock.arc.value.as_ref().unwrap().get() }
244     }
245 }
246 
247 impl<T: Unpin> DerefMut for BiLockGuard<'_, T> {
deref_mut(&mut self) -> &mut T248     fn deref_mut(&mut self) -> &mut T {
249         unsafe { &mut *self.bilock.arc.value.as_ref().unwrap().get() }
250     }
251 }
252 
253 impl<T> BiLockGuard<'_, T> {
254     /// Get a mutable pinned reference to the locked value.
as_pin_mut(&mut self) -> Pin<&mut T>255     pub fn as_pin_mut(&mut self) -> Pin<&mut T> {
256         // Safety: we never allow moving a !Unpin value out of a bilock, nor
257         // allow mutable access to it
258         unsafe { Pin::new_unchecked(&mut *self.bilock.arc.value.as_ref().unwrap().get()) }
259     }
260 }
261 
262 impl<T> Drop for BiLockGuard<'_, T> {
drop(&mut self)263     fn drop(&mut self) {
264         self.bilock.unlock();
265     }
266 }
267 
268 /// Future returned by `BiLock::lock` which will resolve when the lock is
269 /// acquired.
270 #[cfg(feature = "bilock")]
271 #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
272 #[must_use = "futures do nothing unless you `.await` or poll them"]
273 #[derive(Debug)]
274 pub struct BiLockAcquire<'a, T> {
275     bilock: &'a BiLock<T>,
276 }
277 
278 // Pinning is never projected to fields
279 #[cfg(feature = "bilock")]
280 impl<T> Unpin for BiLockAcquire<'_, T> {}
281 
282 #[cfg(feature = "bilock")]
283 impl<'a, T> Future for BiLockAcquire<'a, T> {
284     type Output = BiLockGuard<'a, T>;
285 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>286     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
287         self.bilock.poll_lock(cx)
288     }
289 }
290 
291 // Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible.
292 #[allow(clippy::useless_transmute)]
293 #[inline]
invalid_ptr<T>(addr: usize) -> *mut T294 fn invalid_ptr<T>(addr: usize) -> *mut T {
295     // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that
296     // pointer).
297     unsafe { core::mem::transmute(addr) }
298 }
299