1 use alloc::boxed::Box; 2 use core::cell::UnsafeCell; 3 use core::fmt; 4 use core::marker::PhantomData; 5 use core::mem::MaybeUninit; 6 use core::panic::{RefUnwindSafe, UnwindSafe}; 7 use core::ptr; 8 use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; 9 10 use crossbeam_utils::{Backoff, CachePadded}; 11 12 // Bits indicating the state of a slot: 13 // * If a value has been written into the slot, `WRITE` is set. 14 // * If a value has been read from the slot, `READ` is set. 15 // * If the block is being destroyed, `DESTROY` is set. 16 const WRITE: usize = 1; 17 const READ: usize = 2; 18 const DESTROY: usize = 4; 19 20 // Each block covers one "lap" of indices. 21 const LAP: usize = 32; 22 // The maximum number of values a block can hold. 23 const BLOCK_CAP: usize = LAP - 1; 24 // How many lower bits are reserved for metadata. 25 const SHIFT: usize = 1; 26 // Indicates that the block is not the last one. 27 const HAS_NEXT: usize = 1; 28 29 /// A slot in a block. 30 struct Slot<T> { 31 /// The value. 32 value: UnsafeCell<MaybeUninit<T>>, 33 34 /// The state of the slot. 35 state: AtomicUsize, 36 } 37 38 impl<T> Slot<T> { 39 const UNINIT: Self = Self { 40 value: UnsafeCell::new(MaybeUninit::uninit()), 41 state: AtomicUsize::new(0), 42 }; 43 44 /// Waits until a value is written into the slot. wait_write(&self)45 fn wait_write(&self) { 46 let backoff = Backoff::new(); 47 while self.state.load(Ordering::Acquire) & WRITE == 0 { 48 backoff.snooze(); 49 } 50 } 51 } 52 53 /// A block in a linked list. 54 /// 55 /// Each block in the list can hold up to `BLOCK_CAP` values. 56 struct Block<T> { 57 /// The next block in the linked list. 58 next: AtomicPtr<Block<T>>, 59 60 /// Slots for values. 61 slots: [Slot<T>; BLOCK_CAP], 62 } 63 64 impl<T> Block<T> { 65 /// Creates an empty block that starts at `start_index`. new() -> Block<T>66 fn new() -> Block<T> { 67 Self { 68 next: AtomicPtr::new(ptr::null_mut()), 69 slots: [Slot::UNINIT; BLOCK_CAP], 70 } 71 } 72 73 /// Waits until the next pointer is set. wait_next(&self) -> *mut Block<T>74 fn wait_next(&self) -> *mut Block<T> { 75 let backoff = Backoff::new(); 76 loop { 77 let next = self.next.load(Ordering::Acquire); 78 if !next.is_null() { 79 return next; 80 } 81 backoff.snooze(); 82 } 83 } 84 85 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. destroy(this: *mut Block<T>, start: usize)86 unsafe fn destroy(this: *mut Block<T>, start: usize) { 87 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has 88 // begun destruction of the block. 89 for i in start..BLOCK_CAP - 1 { 90 let slot = (*this).slots.get_unchecked(i); 91 92 // Mark the `DESTROY` bit if a thread is still using the slot. 93 if slot.state.load(Ordering::Acquire) & READ == 0 94 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 95 { 96 // If a thread is still using the slot, it will continue destruction of the block. 97 return; 98 } 99 } 100 101 // No thread is using the block, now it is safe to destroy it. 102 drop(Box::from_raw(this)); 103 } 104 } 105 106 /// A position in a queue. 107 struct Position<T> { 108 /// The index in the queue. 109 index: AtomicUsize, 110 111 /// The block in the linked list. 112 block: AtomicPtr<Block<T>>, 113 } 114 115 /// An unbounded multi-producer multi-consumer queue. 116 /// 117 /// This queue is implemented as a linked list of segments, where each segment is a small buffer 118 /// that can hold a handful of elements. There is no limit to how many elements can be in the queue 119 /// at a time. However, since segments need to be dynamically allocated as elements get pushed, 120 /// this queue is somewhat slower than [`ArrayQueue`]. 121 /// 122 /// [`ArrayQueue`]: super::ArrayQueue 123 /// 124 /// # Examples 125 /// 126 /// ``` 127 /// use crossbeam_queue::SegQueue; 128 /// 129 /// let q = SegQueue::new(); 130 /// 131 /// q.push('a'); 132 /// q.push('b'); 133 /// 134 /// assert_eq!(q.pop(), Some('a')); 135 /// assert_eq!(q.pop(), Some('b')); 136 /// assert!(q.pop().is_none()); 137 /// ``` 138 pub struct SegQueue<T> { 139 /// The head of the queue. 140 head: CachePadded<Position<T>>, 141 142 /// The tail of the queue. 143 tail: CachePadded<Position<T>>, 144 145 /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`. 146 _marker: PhantomData<T>, 147 } 148 149 unsafe impl<T: Send> Send for SegQueue<T> {} 150 unsafe impl<T: Send> Sync for SegQueue<T> {} 151 152 impl<T> UnwindSafe for SegQueue<T> {} 153 impl<T> RefUnwindSafe for SegQueue<T> {} 154 155 impl<T> SegQueue<T> { 156 /// Creates a new unbounded queue. 157 /// 158 /// # Examples 159 /// 160 /// ``` 161 /// use crossbeam_queue::SegQueue; 162 /// 163 /// let q = SegQueue::<i32>::new(); 164 /// ``` new() -> SegQueue<T>165 pub const fn new() -> SegQueue<T> { 166 SegQueue { 167 head: CachePadded::new(Position { 168 block: AtomicPtr::new(ptr::null_mut()), 169 index: AtomicUsize::new(0), 170 }), 171 tail: CachePadded::new(Position { 172 block: AtomicPtr::new(ptr::null_mut()), 173 index: AtomicUsize::new(0), 174 }), 175 _marker: PhantomData, 176 } 177 } 178 179 /// Pushes an element into the queue. 180 /// 181 /// # Examples 182 /// 183 /// ``` 184 /// use crossbeam_queue::SegQueue; 185 /// 186 /// let q = SegQueue::new(); 187 /// 188 /// q.push(10); 189 /// q.push(20); 190 /// ``` push(&self, value: T)191 pub fn push(&self, value: T) { 192 let backoff = Backoff::new(); 193 let mut tail = self.tail.index.load(Ordering::Acquire); 194 let mut block = self.tail.block.load(Ordering::Acquire); 195 let mut next_block = None; 196 197 loop { 198 // Calculate the offset of the index into the block. 199 let offset = (tail >> SHIFT) % LAP; 200 201 // If we reached the end of the block, wait until the next one is installed. 202 if offset == BLOCK_CAP { 203 backoff.snooze(); 204 tail = self.tail.index.load(Ordering::Acquire); 205 block = self.tail.block.load(Ordering::Acquire); 206 continue; 207 } 208 209 // If we're going to have to install the next block, allocate it in advance in order to 210 // make the wait for other threads as short as possible. 211 if offset + 1 == BLOCK_CAP && next_block.is_none() { 212 next_block = Some(Box::new(Block::<T>::new())); 213 } 214 215 // If this is the first push operation, we need to allocate the first block. 216 if block.is_null() { 217 let new = Box::into_raw(Box::new(Block::<T>::new())); 218 219 if self 220 .tail 221 .block 222 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) 223 .is_ok() 224 { 225 self.head.block.store(new, Ordering::Release); 226 block = new; 227 } else { 228 next_block = unsafe { Some(Box::from_raw(new)) }; 229 tail = self.tail.index.load(Ordering::Acquire); 230 block = self.tail.block.load(Ordering::Acquire); 231 continue; 232 } 233 } 234 235 let new_tail = tail + (1 << SHIFT); 236 237 // Try advancing the tail forward. 238 match self.tail.index.compare_exchange_weak( 239 tail, 240 new_tail, 241 Ordering::SeqCst, 242 Ordering::Acquire, 243 ) { 244 Ok(_) => unsafe { 245 // If we've reached the end of the block, install the next one. 246 if offset + 1 == BLOCK_CAP { 247 let next_block = Box::into_raw(next_block.unwrap()); 248 let next_index = new_tail.wrapping_add(1 << SHIFT); 249 250 self.tail.block.store(next_block, Ordering::Release); 251 self.tail.index.store(next_index, Ordering::Release); 252 (*block).next.store(next_block, Ordering::Release); 253 } 254 255 // Write the value into the slot. 256 let slot = (*block).slots.get_unchecked(offset); 257 slot.value.get().write(MaybeUninit::new(value)); 258 slot.state.fetch_or(WRITE, Ordering::Release); 259 260 return; 261 }, 262 Err(t) => { 263 tail = t; 264 block = self.tail.block.load(Ordering::Acquire); 265 backoff.spin(); 266 } 267 } 268 } 269 } 270 271 /// Pops an element from the queue. 272 /// 273 /// If the queue is empty, `None` is returned. 274 /// 275 /// # Examples 276 /// 277 /// ``` 278 /// use crossbeam_queue::SegQueue; 279 /// 280 /// let q = SegQueue::new(); 281 /// 282 /// q.push(10); 283 /// assert_eq!(q.pop(), Some(10)); 284 /// assert!(q.pop().is_none()); 285 /// ``` pop(&self) -> Option<T>286 pub fn pop(&self) -> Option<T> { 287 let backoff = Backoff::new(); 288 let mut head = self.head.index.load(Ordering::Acquire); 289 let mut block = self.head.block.load(Ordering::Acquire); 290 291 loop { 292 // Calculate the offset of the index into the block. 293 let offset = (head >> SHIFT) % LAP; 294 295 // If we reached the end of the block, wait until the next one is installed. 296 if offset == BLOCK_CAP { 297 backoff.snooze(); 298 head = self.head.index.load(Ordering::Acquire); 299 block = self.head.block.load(Ordering::Acquire); 300 continue; 301 } 302 303 let mut new_head = head + (1 << SHIFT); 304 305 if new_head & HAS_NEXT == 0 { 306 atomic::fence(Ordering::SeqCst); 307 let tail = self.tail.index.load(Ordering::Relaxed); 308 309 // If the tail equals the head, that means the queue is empty. 310 if head >> SHIFT == tail >> SHIFT { 311 return None; 312 } 313 314 // If head and tail are not in the same block, set `HAS_NEXT` in head. 315 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 316 new_head |= HAS_NEXT; 317 } 318 } 319 320 // The block can be null here only if the first push operation is in progress. In that 321 // case, just wait until it gets initialized. 322 if block.is_null() { 323 backoff.snooze(); 324 head = self.head.index.load(Ordering::Acquire); 325 block = self.head.block.load(Ordering::Acquire); 326 continue; 327 } 328 329 // Try moving the head index forward. 330 match self.head.index.compare_exchange_weak( 331 head, 332 new_head, 333 Ordering::SeqCst, 334 Ordering::Acquire, 335 ) { 336 Ok(_) => unsafe { 337 // If we've reached the end of the block, move to the next one. 338 if offset + 1 == BLOCK_CAP { 339 let next = (*block).wait_next(); 340 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 341 if !(*next).next.load(Ordering::Relaxed).is_null() { 342 next_index |= HAS_NEXT; 343 } 344 345 self.head.block.store(next, Ordering::Release); 346 self.head.index.store(next_index, Ordering::Release); 347 } 348 349 // Read the value. 350 let slot = (*block).slots.get_unchecked(offset); 351 slot.wait_write(); 352 let value = slot.value.get().read().assume_init(); 353 354 // Destroy the block if we've reached the end, or if another thread wanted to 355 // destroy but couldn't because we were busy reading from the slot. 356 if offset + 1 == BLOCK_CAP { 357 Block::destroy(block, 0); 358 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 359 Block::destroy(block, offset + 1); 360 } 361 362 return Some(value); 363 }, 364 Err(h) => { 365 head = h; 366 block = self.head.block.load(Ordering::Acquire); 367 backoff.spin(); 368 } 369 } 370 } 371 } 372 373 /// Returns `true` if the queue is empty. 374 /// 375 /// # Examples 376 /// 377 /// ``` 378 /// use crossbeam_queue::SegQueue; 379 /// 380 /// let q = SegQueue::new(); 381 /// 382 /// assert!(q.is_empty()); 383 /// q.push(1); 384 /// assert!(!q.is_empty()); 385 /// ``` is_empty(&self) -> bool386 pub fn is_empty(&self) -> bool { 387 let head = self.head.index.load(Ordering::SeqCst); 388 let tail = self.tail.index.load(Ordering::SeqCst); 389 head >> SHIFT == tail >> SHIFT 390 } 391 392 /// Returns the number of elements in the queue. 393 /// 394 /// # Examples 395 /// 396 /// ``` 397 /// use crossbeam_queue::SegQueue; 398 /// 399 /// let q = SegQueue::new(); 400 /// assert_eq!(q.len(), 0); 401 /// 402 /// q.push(10); 403 /// assert_eq!(q.len(), 1); 404 /// 405 /// q.push(20); 406 /// assert_eq!(q.len(), 2); 407 /// ``` len(&self) -> usize408 pub fn len(&self) -> usize { 409 loop { 410 // Load the tail index, then load the head index. 411 let mut tail = self.tail.index.load(Ordering::SeqCst); 412 let mut head = self.head.index.load(Ordering::SeqCst); 413 414 // If the tail index didn't change, we've got consistent indices to work with. 415 if self.tail.index.load(Ordering::SeqCst) == tail { 416 // Erase the lower bits. 417 tail &= !((1 << SHIFT) - 1); 418 head &= !((1 << SHIFT) - 1); 419 420 // Fix up indices if they fall onto block ends. 421 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { 422 tail = tail.wrapping_add(1 << SHIFT); 423 } 424 if (head >> SHIFT) & (LAP - 1) == LAP - 1 { 425 head = head.wrapping_add(1 << SHIFT); 426 } 427 428 // Rotate indices so that head falls into the first block. 429 let lap = (head >> SHIFT) / LAP; 430 tail = tail.wrapping_sub((lap * LAP) << SHIFT); 431 head = head.wrapping_sub((lap * LAP) << SHIFT); 432 433 // Remove the lower bits. 434 tail >>= SHIFT; 435 head >>= SHIFT; 436 437 // Return the difference minus the number of blocks between tail and head. 438 return tail - head - tail / LAP; 439 } 440 } 441 } 442 } 443 444 impl<T> Drop for SegQueue<T> { drop(&mut self)445 fn drop(&mut self) { 446 let mut head = *self.head.index.get_mut(); 447 let mut tail = *self.tail.index.get_mut(); 448 let mut block = *self.head.block.get_mut(); 449 450 // Erase the lower bits. 451 head &= !((1 << SHIFT) - 1); 452 tail &= !((1 << SHIFT) - 1); 453 454 unsafe { 455 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. 456 while head != tail { 457 let offset = (head >> SHIFT) % LAP; 458 459 if offset < BLOCK_CAP { 460 // Drop the value in the slot. 461 let slot = (*block).slots.get_unchecked(offset); 462 (*slot.value.get()).assume_init_drop(); 463 } else { 464 // Deallocate the block and move to the next one. 465 let next = *(*block).next.get_mut(); 466 drop(Box::from_raw(block)); 467 block = next; 468 } 469 470 head = head.wrapping_add(1 << SHIFT); 471 } 472 473 // Deallocate the last remaining block. 474 if !block.is_null() { 475 drop(Box::from_raw(block)); 476 } 477 } 478 } 479 } 480 481 impl<T> fmt::Debug for SegQueue<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result482 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 483 f.pad("SegQueue { .. }") 484 } 485 } 486 487 impl<T> Default for SegQueue<T> { default() -> SegQueue<T>488 fn default() -> SegQueue<T> { 489 SegQueue::new() 490 } 491 } 492 493 impl<T> IntoIterator for SegQueue<T> { 494 type Item = T; 495 496 type IntoIter = IntoIter<T>; 497 into_iter(self) -> Self::IntoIter498 fn into_iter(self) -> Self::IntoIter { 499 IntoIter { value: self } 500 } 501 } 502 503 #[derive(Debug)] 504 pub struct IntoIter<T> { 505 value: SegQueue<T>, 506 } 507 508 impl<T> Iterator for IntoIter<T> { 509 type Item = T; 510 next(&mut self) -> Option<Self::Item>511 fn next(&mut self) -> Option<Self::Item> { 512 let value = &mut self.value; 513 let head = *value.head.index.get_mut(); 514 let tail = *value.tail.index.get_mut(); 515 if head >> SHIFT == tail >> SHIFT { 516 None 517 } else { 518 let block = *value.head.block.get_mut(); 519 let offset = (head >> SHIFT) % LAP; 520 521 // SAFETY: We have mutable access to this, so we can read without 522 // worrying about concurrency. Furthermore, we know this is 523 // initialized because it is the value pointed at by `value.head` 524 // and this is a non-empty queue. 525 let item = unsafe { 526 let slot = (*block).slots.get_unchecked(offset); 527 slot.value.get().read().assume_init() 528 }; 529 if offset + 1 == BLOCK_CAP { 530 // Deallocate the block and move to the next one. 531 // SAFETY: The block is initialized because we've been reading 532 // from it this entire time. We can drop it b/c everything has 533 // been read out of it, so nothing is pointing to it anymore. 534 unsafe { 535 let next = *(*block).next.get_mut(); 536 drop(Box::from_raw(block)); 537 *value.head.block.get_mut() = next; 538 } 539 // The last value in a block is empty, so skip it 540 *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT); 541 // Double-check that we're pointing to the first item in a block. 542 debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0); 543 } else { 544 *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT); 545 } 546 Some(item) 547 } 548 } 549 } 550