1 use alloc::boxed::Box;
2 use core::cell::UnsafeCell;
3 use core::fmt;
4 use core::marker::PhantomData;
5 use core::mem::MaybeUninit;
6 use core::panic::{RefUnwindSafe, UnwindSafe};
7 use core::ptr;
8 use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
9 
10 use crossbeam_utils::{Backoff, CachePadded};
11 
12 // Bits indicating the state of a slot:
13 // * If a value has been written into the slot, `WRITE` is set.
14 // * If a value has been read from the slot, `READ` is set.
15 // * If the block is being destroyed, `DESTROY` is set.
16 const WRITE: usize = 1;
17 const READ: usize = 2;
18 const DESTROY: usize = 4;
19 
20 // Each block covers one "lap" of indices.
21 const LAP: usize = 32;
22 // The maximum number of values a block can hold.
23 const BLOCK_CAP: usize = LAP - 1;
24 // How many lower bits are reserved for metadata.
25 const SHIFT: usize = 1;
26 // Indicates that the block is not the last one.
27 const HAS_NEXT: usize = 1;
28 
29 /// A slot in a block.
30 struct Slot<T> {
31     /// The value.
32     value: UnsafeCell<MaybeUninit<T>>,
33 
34     /// The state of the slot.
35     state: AtomicUsize,
36 }
37 
38 impl<T> Slot<T> {
39     const UNINIT: Self = Self {
40         value: UnsafeCell::new(MaybeUninit::uninit()),
41         state: AtomicUsize::new(0),
42     };
43 
44     /// Waits until a value is written into the slot.
wait_write(&self)45     fn wait_write(&self) {
46         let backoff = Backoff::new();
47         while self.state.load(Ordering::Acquire) & WRITE == 0 {
48             backoff.snooze();
49         }
50     }
51 }
52 
53 /// A block in a linked list.
54 ///
55 /// Each block in the list can hold up to `BLOCK_CAP` values.
56 struct Block<T> {
57     /// The next block in the linked list.
58     next: AtomicPtr<Block<T>>,
59 
60     /// Slots for values.
61     slots: [Slot<T>; BLOCK_CAP],
62 }
63 
64 impl<T> Block<T> {
65     /// Creates an empty block that starts at `start_index`.
new() -> Block<T>66     fn new() -> Block<T> {
67         Self {
68             next: AtomicPtr::new(ptr::null_mut()),
69             slots: [Slot::UNINIT; BLOCK_CAP],
70         }
71     }
72 
73     /// Waits until the next pointer is set.
wait_next(&self) -> *mut Block<T>74     fn wait_next(&self) -> *mut Block<T> {
75         let backoff = Backoff::new();
76         loop {
77             let next = self.next.load(Ordering::Acquire);
78             if !next.is_null() {
79                 return next;
80             }
81             backoff.snooze();
82         }
83     }
84 
85     /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
destroy(this: *mut Block<T>, start: usize)86     unsafe fn destroy(this: *mut Block<T>, start: usize) {
87         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
88         // begun destruction of the block.
89         for i in start..BLOCK_CAP - 1 {
90             let slot = (*this).slots.get_unchecked(i);
91 
92             // Mark the `DESTROY` bit if a thread is still using the slot.
93             if slot.state.load(Ordering::Acquire) & READ == 0
94                 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
95             {
96                 // If a thread is still using the slot, it will continue destruction of the block.
97                 return;
98             }
99         }
100 
101         // No thread is using the block, now it is safe to destroy it.
102         drop(Box::from_raw(this));
103     }
104 }
105 
106 /// A position in a queue.
107 struct Position<T> {
108     /// The index in the queue.
109     index: AtomicUsize,
110 
111     /// The block in the linked list.
112     block: AtomicPtr<Block<T>>,
113 }
114 
115 /// An unbounded multi-producer multi-consumer queue.
116 ///
117 /// This queue is implemented as a linked list of segments, where each segment is a small buffer
118 /// that can hold a handful of elements. There is no limit to how many elements can be in the queue
119 /// at a time. However, since segments need to be dynamically allocated as elements get pushed,
120 /// this queue is somewhat slower than [`ArrayQueue`].
121 ///
122 /// [`ArrayQueue`]: super::ArrayQueue
123 ///
124 /// # Examples
125 ///
126 /// ```
127 /// use crossbeam_queue::SegQueue;
128 ///
129 /// let q = SegQueue::new();
130 ///
131 /// q.push('a');
132 /// q.push('b');
133 ///
134 /// assert_eq!(q.pop(), Some('a'));
135 /// assert_eq!(q.pop(), Some('b'));
136 /// assert!(q.pop().is_none());
137 /// ```
138 pub struct SegQueue<T> {
139     /// The head of the queue.
140     head: CachePadded<Position<T>>,
141 
142     /// The tail of the queue.
143     tail: CachePadded<Position<T>>,
144 
145     /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
146     _marker: PhantomData<T>,
147 }
148 
149 unsafe impl<T: Send> Send for SegQueue<T> {}
150 unsafe impl<T: Send> Sync for SegQueue<T> {}
151 
152 impl<T> UnwindSafe for SegQueue<T> {}
153 impl<T> RefUnwindSafe for SegQueue<T> {}
154 
155 impl<T> SegQueue<T> {
156     /// Creates a new unbounded queue.
157     ///
158     /// # Examples
159     ///
160     /// ```
161     /// use crossbeam_queue::SegQueue;
162     ///
163     /// let q = SegQueue::<i32>::new();
164     /// ```
new() -> SegQueue<T>165     pub const fn new() -> SegQueue<T> {
166         SegQueue {
167             head: CachePadded::new(Position {
168                 block: AtomicPtr::new(ptr::null_mut()),
169                 index: AtomicUsize::new(0),
170             }),
171             tail: CachePadded::new(Position {
172                 block: AtomicPtr::new(ptr::null_mut()),
173                 index: AtomicUsize::new(0),
174             }),
175             _marker: PhantomData,
176         }
177     }
178 
179     /// Pushes an element into the queue.
180     ///
181     /// # Examples
182     ///
183     /// ```
184     /// use crossbeam_queue::SegQueue;
185     ///
186     /// let q = SegQueue::new();
187     ///
188     /// q.push(10);
189     /// q.push(20);
190     /// ```
push(&self, value: T)191     pub fn push(&self, value: T) {
192         let backoff = Backoff::new();
193         let mut tail = self.tail.index.load(Ordering::Acquire);
194         let mut block = self.tail.block.load(Ordering::Acquire);
195         let mut next_block = None;
196 
197         loop {
198             // Calculate the offset of the index into the block.
199             let offset = (tail >> SHIFT) % LAP;
200 
201             // If we reached the end of the block, wait until the next one is installed.
202             if offset == BLOCK_CAP {
203                 backoff.snooze();
204                 tail = self.tail.index.load(Ordering::Acquire);
205                 block = self.tail.block.load(Ordering::Acquire);
206                 continue;
207             }
208 
209             // If we're going to have to install the next block, allocate it in advance in order to
210             // make the wait for other threads as short as possible.
211             if offset + 1 == BLOCK_CAP && next_block.is_none() {
212                 next_block = Some(Box::new(Block::<T>::new()));
213             }
214 
215             // If this is the first push operation, we need to allocate the first block.
216             if block.is_null() {
217                 let new = Box::into_raw(Box::new(Block::<T>::new()));
218 
219                 if self
220                     .tail
221                     .block
222                     .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
223                     .is_ok()
224                 {
225                     self.head.block.store(new, Ordering::Release);
226                     block = new;
227                 } else {
228                     next_block = unsafe { Some(Box::from_raw(new)) };
229                     tail = self.tail.index.load(Ordering::Acquire);
230                     block = self.tail.block.load(Ordering::Acquire);
231                     continue;
232                 }
233             }
234 
235             let new_tail = tail + (1 << SHIFT);
236 
237             // Try advancing the tail forward.
238             match self.tail.index.compare_exchange_weak(
239                 tail,
240                 new_tail,
241                 Ordering::SeqCst,
242                 Ordering::Acquire,
243             ) {
244                 Ok(_) => unsafe {
245                     // If we've reached the end of the block, install the next one.
246                     if offset + 1 == BLOCK_CAP {
247                         let next_block = Box::into_raw(next_block.unwrap());
248                         let next_index = new_tail.wrapping_add(1 << SHIFT);
249 
250                         self.tail.block.store(next_block, Ordering::Release);
251                         self.tail.index.store(next_index, Ordering::Release);
252                         (*block).next.store(next_block, Ordering::Release);
253                     }
254 
255                     // Write the value into the slot.
256                     let slot = (*block).slots.get_unchecked(offset);
257                     slot.value.get().write(MaybeUninit::new(value));
258                     slot.state.fetch_or(WRITE, Ordering::Release);
259 
260                     return;
261                 },
262                 Err(t) => {
263                     tail = t;
264                     block = self.tail.block.load(Ordering::Acquire);
265                     backoff.spin();
266                 }
267             }
268         }
269     }
270 
271     /// Pops an element from the queue.
272     ///
273     /// If the queue is empty, `None` is returned.
274     ///
275     /// # Examples
276     ///
277     /// ```
278     /// use crossbeam_queue::SegQueue;
279     ///
280     /// let q = SegQueue::new();
281     ///
282     /// q.push(10);
283     /// assert_eq!(q.pop(), Some(10));
284     /// assert!(q.pop().is_none());
285     /// ```
pop(&self) -> Option<T>286     pub fn pop(&self) -> Option<T> {
287         let backoff = Backoff::new();
288         let mut head = self.head.index.load(Ordering::Acquire);
289         let mut block = self.head.block.load(Ordering::Acquire);
290 
291         loop {
292             // Calculate the offset of the index into the block.
293             let offset = (head >> SHIFT) % LAP;
294 
295             // If we reached the end of the block, wait until the next one is installed.
296             if offset == BLOCK_CAP {
297                 backoff.snooze();
298                 head = self.head.index.load(Ordering::Acquire);
299                 block = self.head.block.load(Ordering::Acquire);
300                 continue;
301             }
302 
303             let mut new_head = head + (1 << SHIFT);
304 
305             if new_head & HAS_NEXT == 0 {
306                 atomic::fence(Ordering::SeqCst);
307                 let tail = self.tail.index.load(Ordering::Relaxed);
308 
309                 // If the tail equals the head, that means the queue is empty.
310                 if head >> SHIFT == tail >> SHIFT {
311                     return None;
312                 }
313 
314                 // If head and tail are not in the same block, set `HAS_NEXT` in head.
315                 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
316                     new_head |= HAS_NEXT;
317                 }
318             }
319 
320             // The block can be null here only if the first push operation is in progress. In that
321             // case, just wait until it gets initialized.
322             if block.is_null() {
323                 backoff.snooze();
324                 head = self.head.index.load(Ordering::Acquire);
325                 block = self.head.block.load(Ordering::Acquire);
326                 continue;
327             }
328 
329             // Try moving the head index forward.
330             match self.head.index.compare_exchange_weak(
331                 head,
332                 new_head,
333                 Ordering::SeqCst,
334                 Ordering::Acquire,
335             ) {
336                 Ok(_) => unsafe {
337                     // If we've reached the end of the block, move to the next one.
338                     if offset + 1 == BLOCK_CAP {
339                         let next = (*block).wait_next();
340                         let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
341                         if !(*next).next.load(Ordering::Relaxed).is_null() {
342                             next_index |= HAS_NEXT;
343                         }
344 
345                         self.head.block.store(next, Ordering::Release);
346                         self.head.index.store(next_index, Ordering::Release);
347                     }
348 
349                     // Read the value.
350                     let slot = (*block).slots.get_unchecked(offset);
351                     slot.wait_write();
352                     let value = slot.value.get().read().assume_init();
353 
354                     // Destroy the block if we've reached the end, or if another thread wanted to
355                     // destroy but couldn't because we were busy reading from the slot.
356                     if offset + 1 == BLOCK_CAP {
357                         Block::destroy(block, 0);
358                     } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
359                         Block::destroy(block, offset + 1);
360                     }
361 
362                     return Some(value);
363                 },
364                 Err(h) => {
365                     head = h;
366                     block = self.head.block.load(Ordering::Acquire);
367                     backoff.spin();
368                 }
369             }
370         }
371     }
372 
373     /// Returns `true` if the queue is empty.
374     ///
375     /// # Examples
376     ///
377     /// ```
378     /// use crossbeam_queue::SegQueue;
379     ///
380     /// let q = SegQueue::new();
381     ///
382     /// assert!(q.is_empty());
383     /// q.push(1);
384     /// assert!(!q.is_empty());
385     /// ```
is_empty(&self) -> bool386     pub fn is_empty(&self) -> bool {
387         let head = self.head.index.load(Ordering::SeqCst);
388         let tail = self.tail.index.load(Ordering::SeqCst);
389         head >> SHIFT == tail >> SHIFT
390     }
391 
392     /// Returns the number of elements in the queue.
393     ///
394     /// # Examples
395     ///
396     /// ```
397     /// use crossbeam_queue::SegQueue;
398     ///
399     /// let q = SegQueue::new();
400     /// assert_eq!(q.len(), 0);
401     ///
402     /// q.push(10);
403     /// assert_eq!(q.len(), 1);
404     ///
405     /// q.push(20);
406     /// assert_eq!(q.len(), 2);
407     /// ```
len(&self) -> usize408     pub fn len(&self) -> usize {
409         loop {
410             // Load the tail index, then load the head index.
411             let mut tail = self.tail.index.load(Ordering::SeqCst);
412             let mut head = self.head.index.load(Ordering::SeqCst);
413 
414             // If the tail index didn't change, we've got consistent indices to work with.
415             if self.tail.index.load(Ordering::SeqCst) == tail {
416                 // Erase the lower bits.
417                 tail &= !((1 << SHIFT) - 1);
418                 head &= !((1 << SHIFT) - 1);
419 
420                 // Fix up indices if they fall onto block ends.
421                 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
422                     tail = tail.wrapping_add(1 << SHIFT);
423                 }
424                 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
425                     head = head.wrapping_add(1 << SHIFT);
426                 }
427 
428                 // Rotate indices so that head falls into the first block.
429                 let lap = (head >> SHIFT) / LAP;
430                 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
431                 head = head.wrapping_sub((lap * LAP) << SHIFT);
432 
433                 // Remove the lower bits.
434                 tail >>= SHIFT;
435                 head >>= SHIFT;
436 
437                 // Return the difference minus the number of blocks between tail and head.
438                 return tail - head - tail / LAP;
439             }
440         }
441     }
442 }
443 
444 impl<T> Drop for SegQueue<T> {
drop(&mut self)445     fn drop(&mut self) {
446         let mut head = *self.head.index.get_mut();
447         let mut tail = *self.tail.index.get_mut();
448         let mut block = *self.head.block.get_mut();
449 
450         // Erase the lower bits.
451         head &= !((1 << SHIFT) - 1);
452         tail &= !((1 << SHIFT) - 1);
453 
454         unsafe {
455             // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
456             while head != tail {
457                 let offset = (head >> SHIFT) % LAP;
458 
459                 if offset < BLOCK_CAP {
460                     // Drop the value in the slot.
461                     let slot = (*block).slots.get_unchecked(offset);
462                     (*slot.value.get()).assume_init_drop();
463                 } else {
464                     // Deallocate the block and move to the next one.
465                     let next = *(*block).next.get_mut();
466                     drop(Box::from_raw(block));
467                     block = next;
468                 }
469 
470                 head = head.wrapping_add(1 << SHIFT);
471             }
472 
473             // Deallocate the last remaining block.
474             if !block.is_null() {
475                 drop(Box::from_raw(block));
476             }
477         }
478     }
479 }
480 
481 impl<T> fmt::Debug for SegQueue<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result482     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483         f.pad("SegQueue { .. }")
484     }
485 }
486 
487 impl<T> Default for SegQueue<T> {
default() -> SegQueue<T>488     fn default() -> SegQueue<T> {
489         SegQueue::new()
490     }
491 }
492 
493 impl<T> IntoIterator for SegQueue<T> {
494     type Item = T;
495 
496     type IntoIter = IntoIter<T>;
497 
into_iter(self) -> Self::IntoIter498     fn into_iter(self) -> Self::IntoIter {
499         IntoIter { value: self }
500     }
501 }
502 
503 #[derive(Debug)]
504 pub struct IntoIter<T> {
505     value: SegQueue<T>,
506 }
507 
508 impl<T> Iterator for IntoIter<T> {
509     type Item = T;
510 
next(&mut self) -> Option<Self::Item>511     fn next(&mut self) -> Option<Self::Item> {
512         let value = &mut self.value;
513         let head = *value.head.index.get_mut();
514         let tail = *value.tail.index.get_mut();
515         if head >> SHIFT == tail >> SHIFT {
516             None
517         } else {
518             let block = *value.head.block.get_mut();
519             let offset = (head >> SHIFT) % LAP;
520 
521             // SAFETY: We have mutable access to this, so we can read without
522             // worrying about concurrency. Furthermore, we know this is
523             // initialized because it is the value pointed at by `value.head`
524             // and this is a non-empty queue.
525             let item = unsafe {
526                 let slot = (*block).slots.get_unchecked(offset);
527                 slot.value.get().read().assume_init()
528             };
529             if offset + 1 == BLOCK_CAP {
530                 // Deallocate the block and move to the next one.
531                 // SAFETY: The block is initialized because we've been reading
532                 // from it this entire time. We can drop it b/c everything has
533                 // been read out of it, so nothing is pointing to it anymore.
534                 unsafe {
535                     let next = *(*block).next.get_mut();
536                     drop(Box::from_raw(block));
537                     *value.head.block.get_mut() = next;
538                 }
539                 // The last value in a block is empty, so skip it
540                 *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT);
541                 // Double-check that we're pointing to the first item in a block.
542                 debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0);
543             } else {
544                 *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT);
545             }
546             Some(item)
547         }
548     }
549 }
550