1 //! A concurrent, lock-free, FIFO list.
2 
3 use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
4 use crate::loom::thread;
5 use crate::sync::mpsc::block::{self, Block};
6 
7 use std::fmt;
8 use std::ptr::NonNull;
9 use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
10 
11 /// List queue transmit handle.
12 pub(crate) struct Tx<T> {
13     /// Tail in the `Block` mpmc list.
14     block_tail: AtomicPtr<Block<T>>,
15 
16     /// Position to push the next message. This references a block and offset
17     /// into the block.
18     tail_position: AtomicUsize,
19 }
20 
21 /// List queue receive handle
22 pub(crate) struct Rx<T> {
23     /// Pointer to the block being processed.
24     head: NonNull<Block<T>>,
25 
26     /// Next slot index to process.
27     index: usize,
28 
29     /// Pointer to the next block pending release.
30     free_head: NonNull<Block<T>>,
31 }
32 
33 /// Return value of `Rx::try_pop`.
34 pub(crate) enum TryPopResult<T> {
35     /// Successfully popped a value.
36     Ok(T),
37     /// The channel is empty.
38     Empty,
39     /// The channel is empty and closed.
40     Closed,
41     /// The channel is not empty, but the first value is being written.
42     Busy,
43 }
44 
channel<T>() -> (Tx<T>, Rx<T>)45 pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
46     // Create the initial block shared between the tx and rx halves.
47     let initial_block = Block::new(0);
48     let initial_block_ptr = Box::into_raw(initial_block);
49 
50     let tx = Tx {
51         block_tail: AtomicPtr::new(initial_block_ptr),
52         tail_position: AtomicUsize::new(0),
53     };
54 
55     let head = NonNull::new(initial_block_ptr).unwrap();
56 
57     let rx = Rx {
58         head,
59         index: 0,
60         free_head: head,
61     };
62 
63     (tx, rx)
64 }
65 
66 impl<T> Tx<T> {
67     /// Pushes a value into the list.
push(&self, value: T)68     pub(crate) fn push(&self, value: T) {
69         // First, claim a slot for the value. `Acquire` is used here to
70         // synchronize with the `fetch_add` in `reclaim_blocks`.
71         let slot_index = self.tail_position.fetch_add(1, Acquire);
72 
73         // Load the current block and write the value
74         let block = self.find_block(slot_index);
75 
76         unsafe {
77             // Write the value to the block
78             block.as_ref().write(slot_index, value);
79         }
80     }
81 
82     /// Closes the send half of the list.
83     ///
84     /// Similar process as pushing a value, but instead of writing the value &
85     /// setting the ready flag, the `TX_CLOSED` flag is set on the block.
close(&self)86     pub(crate) fn close(&self) {
87         // First, claim a slot for the value. This is the last slot that will be
88         // claimed.
89         let slot_index = self.tail_position.fetch_add(1, Acquire);
90 
91         let block = self.find_block(slot_index);
92 
93         unsafe { block.as_ref().tx_close() }
94     }
95 
find_block(&self, slot_index: usize) -> NonNull<Block<T>>96     fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> {
97         // The start index of the block that contains `index`.
98         let start_index = block::start_index(slot_index);
99 
100         // The index offset into the block
101         let offset = block::offset(slot_index);
102 
103         // Load the current head of the block
104         let mut block_ptr = self.block_tail.load(Acquire);
105 
106         let block = unsafe { &*block_ptr };
107 
108         // Calculate the distance between the tail ptr and the target block
109         let distance = block.distance(start_index);
110 
111         // Decide if this call to `find_block` should attempt to update the
112         // `block_tail` pointer.
113         //
114         // Updating `block_tail` is not always performed in order to reduce
115         // contention.
116         //
117         // When set, as the routine walks the linked list, it attempts to update
118         // `block_tail`. If the update cannot be performed, `try_updating_tail`
119         // is unset.
120         let mut try_updating_tail = distance > offset;
121 
122         // Walk the linked list of blocks until the block with `start_index` is
123         // found.
124         loop {
125             let block = unsafe { &(*block_ptr) };
126 
127             if block.is_at_index(start_index) {
128                 return unsafe { NonNull::new_unchecked(block_ptr) };
129             }
130 
131             let next_block = block
132                 .load_next(Acquire)
133                 // There is no allocated next block, grow the linked list.
134                 .unwrap_or_else(|| block.grow());
135 
136             // If the block is **not** final, then the tail pointer cannot be
137             // advanced any more.
138             try_updating_tail &= block.is_final();
139 
140             if try_updating_tail {
141                 // Advancing `block_tail` must happen when walking the linked
142                 // list. `block_tail` may not advance passed any blocks that are
143                 // not "final". At the point a block is finalized, it is unknown
144                 // if there are any prior blocks that are unfinalized, which
145                 // makes it impossible to advance `block_tail`.
146                 //
147                 // While walking the linked list, `block_tail` can be advanced
148                 // as long as finalized blocks are traversed.
149                 //
150                 // Release ordering is used to ensure that any subsequent reads
151                 // are able to see the memory pointed to by `block_tail`.
152                 //
153                 // Acquire is not needed as any "actual" value is not accessed.
154                 // At this point, the linked list is walked to acquire blocks.
155                 if self
156                     .block_tail
157                     .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
158                     .is_ok()
159                 {
160                     // Synchronize with any senders
161                     let tail_position = self.tail_position.fetch_add(0, Release);
162 
163                     unsafe {
164                         block.tx_release(tail_position);
165                     }
166                 } else {
167                     // A concurrent sender is also working on advancing
168                     // `block_tail` and this thread is falling behind.
169                     //
170                     // Stop trying to advance the tail pointer
171                     try_updating_tail = false;
172                 }
173             }
174 
175             block_ptr = next_block.as_ptr();
176 
177             thread::yield_now();
178         }
179     }
180 
reclaim_block(&self, mut block: NonNull<Block<T>>)181     pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
182         // The block has been removed from the linked list and ownership
183         // is reclaimed.
184         //
185         // Before dropping the block, see if it can be reused by
186         // inserting it back at the end of the linked list.
187         //
188         // First, reset the data
189         block.as_mut().reclaim();
190 
191         let mut reused = false;
192 
193         // Attempt to insert the block at the end
194         //
195         // Walk at most three times
196         //
197         let curr_ptr = self.block_tail.load(Acquire);
198 
199         // The pointer can never be null
200         debug_assert!(!curr_ptr.is_null());
201 
202         let mut curr = NonNull::new_unchecked(curr_ptr);
203 
204         // TODO: Unify this logic with Block::grow
205         for _ in 0..3 {
206             match curr.as_ref().try_push(&mut block, AcqRel, Acquire) {
207                 Ok(()) => {
208                     reused = true;
209                     break;
210                 }
211                 Err(next) => {
212                     curr = next;
213                 }
214             }
215         }
216 
217         if !reused {
218             let _ = Box::from_raw(block.as_ptr());
219         }
220     }
221 
is_closed(&self) -> bool222     pub(crate) fn is_closed(&self) -> bool {
223         let tail = self.block_tail.load(Acquire);
224 
225         unsafe {
226             let tail_block = &*tail;
227             tail_block.is_closed()
228         }
229     }
230 }
231 
232 impl<T> fmt::Debug for Tx<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result233     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
234         fmt.debug_struct("Tx")
235             .field("block_tail", &self.block_tail.load(Relaxed))
236             .field("tail_position", &self.tail_position.load(Relaxed))
237             .finish()
238     }
239 }
240 
241 impl<T> Rx<T> {
is_empty(&self, tx: &Tx<T>) -> bool242     pub(crate) fn is_empty(&self, tx: &Tx<T>) -> bool {
243         let block = unsafe { self.head.as_ref() };
244         if block.has_value(self.index) {
245             return false;
246         }
247 
248         // It is possible that a block has no value "now" but the list is still not empty.
249         // To be sure, it is necessary to check the length of the list.
250         self.len(tx) == 0
251     }
252 
len(&self, tx: &Tx<T>) -> usize253     pub(crate) fn len(&self, tx: &Tx<T>) -> usize {
254         // When all the senders are dropped, there will be a last block in the tail position,
255         // but it will be closed
256         let tail_position = tx.tail_position.load(Acquire);
257         tail_position - self.index - (tx.is_closed() as usize)
258     }
259 
260     /// Pops the next value off the queue.
pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>>261     pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
262         // Advance `head`, if needed
263         if !self.try_advancing_head() {
264             return None;
265         }
266 
267         self.reclaim_blocks(tx);
268 
269         unsafe {
270             let block = self.head.as_ref();
271 
272             let ret = block.read(self.index);
273 
274             if let Some(block::Read::Value(..)) = ret {
275                 self.index = self.index.wrapping_add(1);
276             }
277 
278             ret
279         }
280     }
281 
282     /// Pops the next value off the queue, detecting whether the block
283     /// is busy or empty on failure.
284     ///
285     /// This function exists because `Rx::pop` can return `None` even if the
286     /// channel's queue contains a message that has been completely written.
287     /// This can happen if the fully delivered message is behind another message
288     /// that is in the middle of being written to the block, since the channel
289     /// can't return the messages out of order.
try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T>290     pub(crate) fn try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T> {
291         let tail_position = tx.tail_position.load(Acquire);
292         let result = self.pop(tx);
293 
294         match result {
295             Some(block::Read::Value(t)) => TryPopResult::Ok(t),
296             Some(block::Read::Closed) => TryPopResult::Closed,
297             None if tail_position == self.index => TryPopResult::Empty,
298             None => TryPopResult::Busy,
299         }
300     }
301 
302     /// Tries advancing the block pointer to the block referenced by `self.index`.
303     ///
304     /// Returns `true` if successful, `false` if there is no next block to load.
try_advancing_head(&mut self) -> bool305     fn try_advancing_head(&mut self) -> bool {
306         let block_index = block::start_index(self.index);
307 
308         loop {
309             let next_block = {
310                 let block = unsafe { self.head.as_ref() };
311 
312                 if block.is_at_index(block_index) {
313                     return true;
314                 }
315 
316                 block.load_next(Acquire)
317             };
318 
319             let next_block = match next_block {
320                 Some(next_block) => next_block,
321                 None => {
322                     return false;
323                 }
324             };
325 
326             self.head = next_block;
327 
328             thread::yield_now();
329         }
330     }
331 
reclaim_blocks(&mut self, tx: &Tx<T>)332     fn reclaim_blocks(&mut self, tx: &Tx<T>) {
333         while self.free_head != self.head {
334             unsafe {
335                 // Get a handle to the block that will be freed and update
336                 // `free_head` to point to the next block.
337                 let block = self.free_head;
338 
339                 let observed_tail_position = block.as_ref().observed_tail_position();
340 
341                 let required_index = match observed_tail_position {
342                     Some(i) => i,
343                     None => return,
344                 };
345 
346                 if required_index > self.index {
347                     return;
348                 }
349 
350                 // We may read the next pointer with `Relaxed` ordering as it is
351                 // guaranteed that the `reclaim_blocks` routine trails the `recv`
352                 // routine. Any memory accessed by `reclaim_blocks` has already
353                 // been acquired by `recv`.
354                 let next_block = block.as_ref().load_next(Relaxed);
355 
356                 // Update the free list head
357                 self.free_head = next_block.unwrap();
358 
359                 // Push the emptied block onto the back of the queue, making it
360                 // available to senders.
361                 tx.reclaim_block(block);
362             }
363 
364             thread::yield_now();
365         }
366     }
367 
368     /// Effectively `Drop` all the blocks. Should only be called once, when
369     /// the list is dropping.
free_blocks(&mut self)370     pub(super) unsafe fn free_blocks(&mut self) {
371         debug_assert_ne!(self.free_head, NonNull::dangling());
372 
373         let mut cur = Some(self.free_head);
374 
375         #[cfg(debug_assertions)]
376         {
377             // to trigger the debug assert above so as to catch that we
378             // don't call `free_blocks` more than once.
379             self.free_head = NonNull::dangling();
380             self.head = NonNull::dangling();
381         }
382 
383         while let Some(block) = cur {
384             cur = block.as_ref().load_next(Relaxed);
385             drop(Box::from_raw(block.as_ptr()));
386         }
387     }
388 }
389 
390 impl<T> fmt::Debug for Rx<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result391     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
392         fmt.debug_struct("Rx")
393             .field("head", &self.head)
394             .field("index", &self.index)
395             .field("free_head", &self.free_head)
396             .finish()
397     }
398 }
399