1 //! Bounded channel based on a preallocated array.
2 //!
3 //! This flavor has a fixed, positive capacity.
4 //!
5 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
6 //!
7 //! Source:
8 //!   - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
9 //!   - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
10 
11 use std::boxed::Box;
12 use std::cell::UnsafeCell;
13 use std::mem::{self, MaybeUninit};
14 use std::ptr;
15 use std::sync::atomic::{self, AtomicUsize, Ordering};
16 use std::time::Instant;
17 
18 use crossbeam_utils::{Backoff, CachePadded};
19 
20 use crate::context::Context;
21 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
22 use crate::select::{Operation, SelectHandle, Selected, Token};
23 use crate::waker::SyncWaker;
24 
25 /// A slot in a channel.
26 struct Slot<T> {
27     /// The current stamp.
28     stamp: AtomicUsize,
29 
30     /// The message in this slot.
31     msg: UnsafeCell<MaybeUninit<T>>,
32 }
33 
34 /// The token type for the array flavor.
35 #[derive(Debug)]
36 pub(crate) struct ArrayToken {
37     /// Slot to read from or write to.
38     slot: *const u8,
39 
40     /// Stamp to store into the slot after reading or writing.
41     stamp: usize,
42 }
43 
44 impl Default for ArrayToken {
45     #[inline]
default() -> Self46     fn default() -> Self {
47         ArrayToken {
48             slot: ptr::null(),
49             stamp: 0,
50         }
51     }
52 }
53 
54 /// Bounded channel based on a preallocated array.
55 pub(crate) struct Channel<T> {
56     /// The head of the channel.
57     ///
58     /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
59     /// packed into a single `usize`. The lower bits represent the index, while the upper bits
60     /// represent the lap. The mark bit in the head is always zero.
61     ///
62     /// Messages are popped from the head of the channel.
63     head: CachePadded<AtomicUsize>,
64 
65     /// The tail of the channel.
66     ///
67     /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
68     /// packed into a single `usize`. The lower bits represent the index, while the upper bits
69     /// represent the lap. The mark bit indicates that the channel is disconnected.
70     ///
71     /// Messages are pushed into the tail of the channel.
72     tail: CachePadded<AtomicUsize>,
73 
74     /// The buffer holding slots.
75     buffer: Box<[Slot<T>]>,
76 
77     /// The channel capacity.
78     cap: usize,
79 
80     /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
81     one_lap: usize,
82 
83     /// If this bit is set in the tail, that means the channel is disconnected.
84     mark_bit: usize,
85 
86     /// Senders waiting while the channel is full.
87     senders: SyncWaker,
88 
89     /// Receivers waiting while the channel is empty and not disconnected.
90     receivers: SyncWaker,
91 }
92 
93 impl<T> Channel<T> {
94     /// Creates a bounded channel of capacity `cap`.
with_capacity(cap: usize) -> Self95     pub(crate) fn with_capacity(cap: usize) -> Self {
96         assert!(cap > 0, "capacity must be positive");
97 
98         // Compute constants `mark_bit` and `one_lap`.
99         let mark_bit = (cap + 1).next_power_of_two();
100         let one_lap = mark_bit * 2;
101 
102         // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
103         let head = 0;
104         // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
105         let tail = 0;
106 
107         // Allocate a buffer of `cap` slots initialized
108         // with stamps.
109         let buffer: Box<[Slot<T>]> = (0..cap)
110             .map(|i| {
111                 // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
112                 Slot {
113                     stamp: AtomicUsize::new(i),
114                     msg: UnsafeCell::new(MaybeUninit::uninit()),
115                 }
116             })
117             .collect();
118 
119         Channel {
120             buffer,
121             cap,
122             one_lap,
123             mark_bit,
124             head: CachePadded::new(AtomicUsize::new(head)),
125             tail: CachePadded::new(AtomicUsize::new(tail)),
126             senders: SyncWaker::new(),
127             receivers: SyncWaker::new(),
128         }
129     }
130 
131     /// Returns a receiver handle to the channel.
receiver(&self) -> Receiver<'_, T>132     pub(crate) fn receiver(&self) -> Receiver<'_, T> {
133         Receiver(self)
134     }
135 
136     /// Returns a sender handle to the channel.
sender(&self) -> Sender<'_, T>137     pub(crate) fn sender(&self) -> Sender<'_, T> {
138         Sender(self)
139     }
140 
141     /// Attempts to reserve a slot for sending a message.
start_send(&self, token: &mut Token) -> bool142     fn start_send(&self, token: &mut Token) -> bool {
143         let backoff = Backoff::new();
144         let mut tail = self.tail.load(Ordering::Relaxed);
145 
146         loop {
147             // Check if the channel is disconnected.
148             if tail & self.mark_bit != 0 {
149                 token.array.slot = ptr::null();
150                 token.array.stamp = 0;
151                 return true;
152             }
153 
154             // Deconstruct the tail.
155             let index = tail & (self.mark_bit - 1);
156             let lap = tail & !(self.one_lap - 1);
157 
158             // Inspect the corresponding slot.
159             debug_assert!(index < self.buffer.len());
160             let slot = unsafe { self.buffer.get_unchecked(index) };
161             let stamp = slot.stamp.load(Ordering::Acquire);
162 
163             // If the tail and the stamp match, we may attempt to push.
164             if tail == stamp {
165                 let new_tail = if index + 1 < self.cap {
166                     // Same lap, incremented index.
167                     // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
168                     tail + 1
169                 } else {
170                     // One lap forward, index wraps around to zero.
171                     // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
172                     lap.wrapping_add(self.one_lap)
173                 };
174 
175                 // Try moving the tail.
176                 match self.tail.compare_exchange_weak(
177                     tail,
178                     new_tail,
179                     Ordering::SeqCst,
180                     Ordering::Relaxed,
181                 ) {
182                     Ok(_) => {
183                         // Prepare the token for the follow-up call to `write`.
184                         token.array.slot = slot as *const Slot<T> as *const u8;
185                         token.array.stamp = tail + 1;
186                         return true;
187                     }
188                     Err(t) => {
189                         tail = t;
190                         backoff.spin();
191                     }
192                 }
193             } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
194                 atomic::fence(Ordering::SeqCst);
195                 let head = self.head.load(Ordering::Relaxed);
196 
197                 // If the head lags one lap behind the tail as well...
198                 if head.wrapping_add(self.one_lap) == tail {
199                     // ...then the channel is full.
200                     return false;
201                 }
202 
203                 backoff.spin();
204                 tail = self.tail.load(Ordering::Relaxed);
205             } else {
206                 // Snooze because we need to wait for the stamp to get updated.
207                 backoff.snooze();
208                 tail = self.tail.load(Ordering::Relaxed);
209             }
210         }
211     }
212 
213     /// Writes a message into the channel.
write(&self, token: &mut Token, msg: T) -> Result<(), T>214     pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
215         // If there is no slot, the channel is disconnected.
216         if token.array.slot.is_null() {
217             return Err(msg);
218         }
219 
220         let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
221 
222         // Write the message into the slot and update the stamp.
223         slot.msg.get().write(MaybeUninit::new(msg));
224         slot.stamp.store(token.array.stamp, Ordering::Release);
225 
226         // Wake a sleeping receiver.
227         self.receivers.notify();
228         Ok(())
229     }
230 
231     /// Attempts to reserve a slot for receiving a message.
start_recv(&self, token: &mut Token) -> bool232     fn start_recv(&self, token: &mut Token) -> bool {
233         let backoff = Backoff::new();
234         let mut head = self.head.load(Ordering::Relaxed);
235 
236         loop {
237             // Deconstruct the head.
238             let index = head & (self.mark_bit - 1);
239             let lap = head & !(self.one_lap - 1);
240 
241             // Inspect the corresponding slot.
242             debug_assert!(index < self.buffer.len());
243             let slot = unsafe { self.buffer.get_unchecked(index) };
244             let stamp = slot.stamp.load(Ordering::Acquire);
245 
246             // If the stamp is ahead of the head by 1, we may attempt to pop.
247             if head + 1 == stamp {
248                 let new = if index + 1 < self.cap {
249                     // Same lap, incremented index.
250                     // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
251                     head + 1
252                 } else {
253                     // One lap forward, index wraps around to zero.
254                     // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
255                     lap.wrapping_add(self.one_lap)
256                 };
257 
258                 // Try moving the head.
259                 match self.head.compare_exchange_weak(
260                     head,
261                     new,
262                     Ordering::SeqCst,
263                     Ordering::Relaxed,
264                 ) {
265                     Ok(_) => {
266                         // Prepare the token for the follow-up call to `read`.
267                         token.array.slot = slot as *const Slot<T> as *const u8;
268                         token.array.stamp = head.wrapping_add(self.one_lap);
269                         return true;
270                     }
271                     Err(h) => {
272                         head = h;
273                         backoff.spin();
274                     }
275                 }
276             } else if stamp == head {
277                 atomic::fence(Ordering::SeqCst);
278                 let tail = self.tail.load(Ordering::Relaxed);
279 
280                 // If the tail equals the head, that means the channel is empty.
281                 if (tail & !self.mark_bit) == head {
282                     // If the channel is disconnected...
283                     if tail & self.mark_bit != 0 {
284                         // ...then receive an error.
285                         token.array.slot = ptr::null();
286                         token.array.stamp = 0;
287                         return true;
288                     } else {
289                         // Otherwise, the receive operation is not ready.
290                         return false;
291                     }
292                 }
293 
294                 backoff.spin();
295                 head = self.head.load(Ordering::Relaxed);
296             } else {
297                 // Snooze because we need to wait for the stamp to get updated.
298                 backoff.snooze();
299                 head = self.head.load(Ordering::Relaxed);
300             }
301         }
302     }
303 
304     /// Reads a message from the channel.
read(&self, token: &mut Token) -> Result<T, ()>305     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
306         if token.array.slot.is_null() {
307             // The channel is disconnected.
308             return Err(());
309         }
310 
311         let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
312 
313         // Read the message from the slot and update the stamp.
314         let msg = slot.msg.get().read().assume_init();
315         slot.stamp.store(token.array.stamp, Ordering::Release);
316 
317         // Wake a sleeping sender.
318         self.senders.notify();
319         Ok(msg)
320     }
321 
322     /// Attempts to send a message into the channel.
try_send(&self, msg: T) -> Result<(), TrySendError<T>>323     pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
324         let token = &mut Token::default();
325         if self.start_send(token) {
326             unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
327         } else {
328             Err(TrySendError::Full(msg))
329         }
330     }
331 
332     /// Sends a message into the channel.
send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>333     pub(crate) fn send(
334         &self,
335         msg: T,
336         deadline: Option<Instant>,
337     ) -> Result<(), SendTimeoutError<T>> {
338         let token = &mut Token::default();
339         loop {
340             // Try sending a message several times.
341             let backoff = Backoff::new();
342             loop {
343                 if self.start_send(token) {
344                     let res = unsafe { self.write(token, msg) };
345                     return res.map_err(SendTimeoutError::Disconnected);
346                 }
347 
348                 if backoff.is_completed() {
349                     break;
350                 } else {
351                     backoff.snooze();
352                 }
353             }
354 
355             if let Some(d) = deadline {
356                 if Instant::now() >= d {
357                     return Err(SendTimeoutError::Timeout(msg));
358                 }
359             }
360 
361             Context::with(|cx| {
362                 // Prepare for blocking until a receiver wakes us up.
363                 let oper = Operation::hook(token);
364                 self.senders.register(oper, cx);
365 
366                 // Has the channel become ready just now?
367                 if !self.is_full() || self.is_disconnected() {
368                     let _ = cx.try_select(Selected::Aborted);
369                 }
370 
371                 // Block the current thread.
372                 let sel = cx.wait_until(deadline);
373 
374                 match sel {
375                     Selected::Waiting => unreachable!(),
376                     Selected::Aborted | Selected::Disconnected => {
377                         self.senders.unregister(oper).unwrap();
378                     }
379                     Selected::Operation(_) => {}
380                 }
381             });
382         }
383     }
384 
385     /// Attempts to receive a message without blocking.
try_recv(&self) -> Result<T, TryRecvError>386     pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
387         let token = &mut Token::default();
388 
389         if self.start_recv(token) {
390             unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
391         } else {
392             Err(TryRecvError::Empty)
393         }
394     }
395 
396     /// Receives a message from the channel.
recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>397     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
398         let token = &mut Token::default();
399         loop {
400             // Try receiving a message several times.
401             let backoff = Backoff::new();
402             loop {
403                 if self.start_recv(token) {
404                     let res = unsafe { self.read(token) };
405                     return res.map_err(|_| RecvTimeoutError::Disconnected);
406                 }
407 
408                 if backoff.is_completed() {
409                     break;
410                 } else {
411                     backoff.snooze();
412                 }
413             }
414 
415             if let Some(d) = deadline {
416                 if Instant::now() >= d {
417                     return Err(RecvTimeoutError::Timeout);
418                 }
419             }
420 
421             Context::with(|cx| {
422                 // Prepare for blocking until a sender wakes us up.
423                 let oper = Operation::hook(token);
424                 self.receivers.register(oper, cx);
425 
426                 // Has the channel become ready just now?
427                 if !self.is_empty() || self.is_disconnected() {
428                     let _ = cx.try_select(Selected::Aborted);
429                 }
430 
431                 // Block the current thread.
432                 let sel = cx.wait_until(deadline);
433 
434                 match sel {
435                     Selected::Waiting => unreachable!(),
436                     Selected::Aborted | Selected::Disconnected => {
437                         self.receivers.unregister(oper).unwrap();
438                         // If the channel was disconnected, we still have to check for remaining
439                         // messages.
440                     }
441                     Selected::Operation(_) => {}
442                 }
443             });
444         }
445     }
446 
447     /// Returns the current number of messages inside the channel.
len(&self) -> usize448     pub(crate) fn len(&self) -> usize {
449         loop {
450             // Load the tail, then load the head.
451             let tail = self.tail.load(Ordering::SeqCst);
452             let head = self.head.load(Ordering::SeqCst);
453 
454             // If the tail didn't change, we've got consistent values to work with.
455             if self.tail.load(Ordering::SeqCst) == tail {
456                 let hix = head & (self.mark_bit - 1);
457                 let tix = tail & (self.mark_bit - 1);
458 
459                 return if hix < tix {
460                     tix - hix
461                 } else if hix > tix {
462                     self.cap - hix + tix
463                 } else if (tail & !self.mark_bit) == head {
464                     0
465                 } else {
466                     self.cap
467                 };
468             }
469         }
470     }
471 
472     /// Returns the capacity of the channel.
capacity(&self) -> Option<usize>473     pub(crate) fn capacity(&self) -> Option<usize> {
474         Some(self.cap)
475     }
476 
477     /// Disconnects the channel and wakes up all blocked senders and receivers.
478     ///
479     /// Returns `true` if this call disconnected the channel.
disconnect(&self) -> bool480     pub(crate) fn disconnect(&self) -> bool {
481         let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
482 
483         if tail & self.mark_bit == 0 {
484             self.senders.disconnect();
485             self.receivers.disconnect();
486             true
487         } else {
488             false
489         }
490     }
491 
492     /// Returns `true` if the channel is disconnected.
is_disconnected(&self) -> bool493     pub(crate) fn is_disconnected(&self) -> bool {
494         self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
495     }
496 
497     /// Returns `true` if the channel is empty.
is_empty(&self) -> bool498     pub(crate) fn is_empty(&self) -> bool {
499         let head = self.head.load(Ordering::SeqCst);
500         let tail = self.tail.load(Ordering::SeqCst);
501 
502         // Is the tail equal to the head?
503         //
504         // Note: If the head changes just before we load the tail, that means there was a moment
505         // when the channel was not empty, so it is safe to just return `false`.
506         (tail & !self.mark_bit) == head
507     }
508 
509     /// Returns `true` if the channel is full.
is_full(&self) -> bool510     pub(crate) fn is_full(&self) -> bool {
511         let tail = self.tail.load(Ordering::SeqCst);
512         let head = self.head.load(Ordering::SeqCst);
513 
514         // Is the head lagging one lap behind tail?
515         //
516         // Note: If the tail changes just before we load the head, that means there was a moment
517         // when the channel was not full, so it is safe to just return `false`.
518         head.wrapping_add(self.one_lap) == tail & !self.mark_bit
519     }
520 }
521 
522 impl<T> Drop for Channel<T> {
drop(&mut self)523     fn drop(&mut self) {
524         if mem::needs_drop::<T>() {
525             // Get the index of the head.
526             let head = *self.head.get_mut();
527             let tail = *self.tail.get_mut();
528 
529             let hix = head & (self.mark_bit - 1);
530             let tix = tail & (self.mark_bit - 1);
531 
532             let len = if hix < tix {
533                 tix - hix
534             } else if hix > tix {
535                 self.cap - hix + tix
536             } else if (tail & !self.mark_bit) == head {
537                 0
538             } else {
539                 self.cap
540             };
541 
542             // Loop over all slots that hold a message and drop them.
543             for i in 0..len {
544                 // Compute the index of the next slot holding a message.
545                 let index = if hix + i < self.cap {
546                     hix + i
547                 } else {
548                     hix + i - self.cap
549                 };
550 
551                 unsafe {
552                     debug_assert!(index < self.buffer.len());
553                     let slot = self.buffer.get_unchecked_mut(index);
554                     (*slot.msg.get()).assume_init_drop();
555                 }
556             }
557         }
558     }
559 }
560 
561 /// Receiver handle to a channel.
562 pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
563 
564 /// Sender handle to a channel.
565 pub(crate) struct Sender<'a, T>(&'a Channel<T>);
566 
567 impl<T> SelectHandle for Receiver<'_, T> {
try_select(&self, token: &mut Token) -> bool568     fn try_select(&self, token: &mut Token) -> bool {
569         self.0.start_recv(token)
570     }
571 
deadline(&self) -> Option<Instant>572     fn deadline(&self) -> Option<Instant> {
573         None
574     }
575 
register(&self, oper: Operation, cx: &Context) -> bool576     fn register(&self, oper: Operation, cx: &Context) -> bool {
577         self.0.receivers.register(oper, cx);
578         self.is_ready()
579     }
580 
unregister(&self, oper: Operation)581     fn unregister(&self, oper: Operation) {
582         self.0.receivers.unregister(oper);
583     }
584 
accept(&self, token: &mut Token, _cx: &Context) -> bool585     fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
586         self.try_select(token)
587     }
588 
is_ready(&self) -> bool589     fn is_ready(&self) -> bool {
590         !self.0.is_empty() || self.0.is_disconnected()
591     }
592 
watch(&self, oper: Operation, cx: &Context) -> bool593     fn watch(&self, oper: Operation, cx: &Context) -> bool {
594         self.0.receivers.watch(oper, cx);
595         self.is_ready()
596     }
597 
unwatch(&self, oper: Operation)598     fn unwatch(&self, oper: Operation) {
599         self.0.receivers.unwatch(oper);
600     }
601 }
602 
603 impl<T> SelectHandle for Sender<'_, T> {
try_select(&self, token: &mut Token) -> bool604     fn try_select(&self, token: &mut Token) -> bool {
605         self.0.start_send(token)
606     }
607 
deadline(&self) -> Option<Instant>608     fn deadline(&self) -> Option<Instant> {
609         None
610     }
611 
register(&self, oper: Operation, cx: &Context) -> bool612     fn register(&self, oper: Operation, cx: &Context) -> bool {
613         self.0.senders.register(oper, cx);
614         self.is_ready()
615     }
616 
unregister(&self, oper: Operation)617     fn unregister(&self, oper: Operation) {
618         self.0.senders.unregister(oper);
619     }
620 
accept(&self, token: &mut Token, _cx: &Context) -> bool621     fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
622         self.try_select(token)
623     }
624 
is_ready(&self) -> bool625     fn is_ready(&self) -> bool {
626         !self.0.is_full() || self.0.is_disconnected()
627     }
628 
watch(&self, oper: Operation, cx: &Context) -> bool629     fn watch(&self, oper: Operation, cx: &Context) -> bool {
630         self.0.senders.watch(oper, cx);
631         self.is_ready()
632     }
633 
unwatch(&self, oper: Operation)634     fn unwatch(&self, oper: Operation) {
635         self.0.senders.unwatch(oper);
636     }
637 }
638