1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue. 2 //! 3 //! Source: 4 //! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue> 5 6 use alloc::boxed::Box; 7 use core::cell::UnsafeCell; 8 use core::fmt; 9 use core::mem::{self, MaybeUninit}; 10 use core::panic::{RefUnwindSafe, UnwindSafe}; 11 use core::sync::atomic::{self, AtomicUsize, Ordering}; 12 13 use crossbeam_utils::{Backoff, CachePadded}; 14 15 /// A slot in a queue. 16 struct Slot<T> { 17 /// The current stamp. 18 /// 19 /// If the stamp equals the tail, this node will be next written to. If it equals head + 1, 20 /// this node will be next read from. 21 stamp: AtomicUsize, 22 23 /// The value in this slot. 24 value: UnsafeCell<MaybeUninit<T>>, 25 } 26 27 /// A bounded multi-producer multi-consumer queue. 28 /// 29 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed 30 /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an 31 /// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for 32 /// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue 33 /// a bit faster than [`SegQueue`]. 34 /// 35 /// [`force_push`]: ArrayQueue::force_push 36 /// [`SegQueue`]: super::SegQueue 37 /// 38 /// # Examples 39 /// 40 /// ``` 41 /// use crossbeam_queue::ArrayQueue; 42 /// 43 /// let q = ArrayQueue::new(2); 44 /// 45 /// assert_eq!(q.push('a'), Ok(())); 46 /// assert_eq!(q.push('b'), Ok(())); 47 /// assert_eq!(q.push('c'), Err('c')); 48 /// assert_eq!(q.pop(), Some('a')); 49 /// ``` 50 pub struct ArrayQueue<T> { 51 /// The head of the queue. 52 /// 53 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a 54 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. 55 /// 56 /// Elements are popped from the head of the queue. 57 head: CachePadded<AtomicUsize>, 58 59 /// The tail of the queue. 60 /// 61 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a 62 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. 63 /// 64 /// Elements are pushed into the tail of the queue. 65 tail: CachePadded<AtomicUsize>, 66 67 /// The buffer holding slots. 68 buffer: Box<[Slot<T>]>, 69 70 /// The queue capacity. 71 cap: usize, 72 73 /// A stamp with the value of `{ lap: 1, index: 0 }`. 74 one_lap: usize, 75 } 76 77 unsafe impl<T: Send> Sync for ArrayQueue<T> {} 78 unsafe impl<T: Send> Send for ArrayQueue<T> {} 79 80 impl<T> UnwindSafe for ArrayQueue<T> {} 81 impl<T> RefUnwindSafe for ArrayQueue<T> {} 82 83 impl<T> ArrayQueue<T> { 84 /// Creates a new bounded queue with the given capacity. 85 /// 86 /// # Panics 87 /// 88 /// Panics if the capacity is zero. 89 /// 90 /// # Examples 91 /// 92 /// ``` 93 /// use crossbeam_queue::ArrayQueue; 94 /// 95 /// let q = ArrayQueue::<i32>::new(100); 96 /// ``` new(cap: usize) -> ArrayQueue<T>97 pub fn new(cap: usize) -> ArrayQueue<T> { 98 assert!(cap > 0, "capacity must be non-zero"); 99 100 // Head is initialized to `{ lap: 0, index: 0 }`. 101 // Tail is initialized to `{ lap: 0, index: 0 }`. 102 let head = 0; 103 let tail = 0; 104 105 // Allocate a buffer of `cap` slots initialized 106 // with stamps. 107 let buffer: Box<[Slot<T>]> = (0..cap) 108 .map(|i| { 109 // Set the stamp to `{ lap: 0, index: i }`. 110 Slot { 111 stamp: AtomicUsize::new(i), 112 value: UnsafeCell::new(MaybeUninit::uninit()), 113 } 114 }) 115 .collect(); 116 117 // One lap is the smallest power of two greater than `cap`. 118 let one_lap = (cap + 1).next_power_of_two(); 119 120 ArrayQueue { 121 buffer, 122 cap, 123 one_lap, 124 head: CachePadded::new(AtomicUsize::new(head)), 125 tail: CachePadded::new(AtomicUsize::new(tail)), 126 } 127 } 128 push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T> where F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,129 fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T> 130 where 131 F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>, 132 { 133 let backoff = Backoff::new(); 134 let mut tail = self.tail.load(Ordering::Relaxed); 135 136 loop { 137 // Deconstruct the tail. 138 let index = tail & (self.one_lap - 1); 139 let lap = tail & !(self.one_lap - 1); 140 141 let new_tail = if index + 1 < self.cap { 142 // Same lap, incremented index. 143 // Set to `{ lap: lap, index: index + 1 }`. 144 tail + 1 145 } else { 146 // One lap forward, index wraps around to zero. 147 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. 148 lap.wrapping_add(self.one_lap) 149 }; 150 151 // Inspect the corresponding slot. 152 debug_assert!(index < self.buffer.len()); 153 let slot = unsafe { self.buffer.get_unchecked(index) }; 154 let stamp = slot.stamp.load(Ordering::Acquire); 155 156 // If the tail and the stamp match, we may attempt to push. 157 if tail == stamp { 158 // Try moving the tail. 159 match self.tail.compare_exchange_weak( 160 tail, 161 new_tail, 162 Ordering::SeqCst, 163 Ordering::Relaxed, 164 ) { 165 Ok(_) => { 166 // Write the value into the slot and update the stamp. 167 unsafe { 168 slot.value.get().write(MaybeUninit::new(value)); 169 } 170 slot.stamp.store(tail + 1, Ordering::Release); 171 return Ok(()); 172 } 173 Err(t) => { 174 tail = t; 175 backoff.spin(); 176 } 177 } 178 } else if stamp.wrapping_add(self.one_lap) == tail + 1 { 179 atomic::fence(Ordering::SeqCst); 180 value = f(value, tail, new_tail, slot)?; 181 backoff.spin(); 182 tail = self.tail.load(Ordering::Relaxed); 183 } else { 184 // Snooze because we need to wait for the stamp to get updated. 185 backoff.snooze(); 186 tail = self.tail.load(Ordering::Relaxed); 187 } 188 } 189 } 190 191 /// Attempts to push an element into the queue. 192 /// 193 /// If the queue is full, the element is returned back as an error. 194 /// 195 /// # Examples 196 /// 197 /// ``` 198 /// use crossbeam_queue::ArrayQueue; 199 /// 200 /// let q = ArrayQueue::new(1); 201 /// 202 /// assert_eq!(q.push(10), Ok(())); 203 /// assert_eq!(q.push(20), Err(20)); 204 /// ``` push(&self, value: T) -> Result<(), T>205 pub fn push(&self, value: T) -> Result<(), T> { 206 self.push_or_else(value, |v, tail, _, _| { 207 let head = self.head.load(Ordering::Relaxed); 208 209 // If the head lags one lap behind the tail as well... 210 if head.wrapping_add(self.one_lap) == tail { 211 // ...then the queue is full. 212 Err(v) 213 } else { 214 Ok(v) 215 } 216 }) 217 } 218 219 /// Pushes an element into the queue, replacing the oldest element if necessary. 220 /// 221 /// If the queue is full, the oldest element is replaced and returned, 222 /// otherwise `None` is returned. 223 /// 224 /// # Examples 225 /// 226 /// ``` 227 /// use crossbeam_queue::ArrayQueue; 228 /// 229 /// let q = ArrayQueue::new(2); 230 /// 231 /// assert_eq!(q.force_push(10), None); 232 /// assert_eq!(q.force_push(20), None); 233 /// assert_eq!(q.force_push(30), Some(10)); 234 /// assert_eq!(q.pop(), Some(20)); 235 /// ``` force_push(&self, value: T) -> Option<T>236 pub fn force_push(&self, value: T) -> Option<T> { 237 self.push_or_else(value, |v, tail, new_tail, slot| { 238 let head = tail.wrapping_sub(self.one_lap); 239 let new_head = new_tail.wrapping_sub(self.one_lap); 240 241 // Try moving the head. 242 if self 243 .head 244 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed) 245 .is_ok() 246 { 247 // Move the tail. 248 self.tail.store(new_tail, Ordering::SeqCst); 249 250 // Swap the previous value. 251 let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() }; 252 253 // Update the stamp. 254 slot.stamp.store(tail + 1, Ordering::Release); 255 256 Err(old) 257 } else { 258 Ok(v) 259 } 260 }) 261 .err() 262 } 263 264 /// Attempts to pop an element from the queue. 265 /// 266 /// If the queue is empty, `None` is returned. 267 /// 268 /// # Examples 269 /// 270 /// ``` 271 /// use crossbeam_queue::ArrayQueue; 272 /// 273 /// let q = ArrayQueue::new(1); 274 /// assert_eq!(q.push(10), Ok(())); 275 /// 276 /// assert_eq!(q.pop(), Some(10)); 277 /// assert!(q.pop().is_none()); 278 /// ``` pop(&self) -> Option<T>279 pub fn pop(&self) -> Option<T> { 280 let backoff = Backoff::new(); 281 let mut head = self.head.load(Ordering::Relaxed); 282 283 loop { 284 // Deconstruct the head. 285 let index = head & (self.one_lap - 1); 286 let lap = head & !(self.one_lap - 1); 287 288 // Inspect the corresponding slot. 289 debug_assert!(index < self.buffer.len()); 290 let slot = unsafe { self.buffer.get_unchecked(index) }; 291 let stamp = slot.stamp.load(Ordering::Acquire); 292 293 // If the the stamp is ahead of the head by 1, we may attempt to pop. 294 if head + 1 == stamp { 295 let new = if index + 1 < self.cap { 296 // Same lap, incremented index. 297 // Set to `{ lap: lap, index: index + 1 }`. 298 head + 1 299 } else { 300 // One lap forward, index wraps around to zero. 301 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. 302 lap.wrapping_add(self.one_lap) 303 }; 304 305 // Try moving the head. 306 match self.head.compare_exchange_weak( 307 head, 308 new, 309 Ordering::SeqCst, 310 Ordering::Relaxed, 311 ) { 312 Ok(_) => { 313 // Read the value from the slot and update the stamp. 314 let msg = unsafe { slot.value.get().read().assume_init() }; 315 slot.stamp 316 .store(head.wrapping_add(self.one_lap), Ordering::Release); 317 return Some(msg); 318 } 319 Err(h) => { 320 head = h; 321 backoff.spin(); 322 } 323 } 324 } else if stamp == head { 325 atomic::fence(Ordering::SeqCst); 326 let tail = self.tail.load(Ordering::Relaxed); 327 328 // If the tail equals the head, that means the channel is empty. 329 if tail == head { 330 return None; 331 } 332 333 backoff.spin(); 334 head = self.head.load(Ordering::Relaxed); 335 } else { 336 // Snooze because we need to wait for the stamp to get updated. 337 backoff.snooze(); 338 head = self.head.load(Ordering::Relaxed); 339 } 340 } 341 } 342 343 /// Returns the capacity of the queue. 344 /// 345 /// # Examples 346 /// 347 /// ``` 348 /// use crossbeam_queue::ArrayQueue; 349 /// 350 /// let q = ArrayQueue::<i32>::new(100); 351 /// 352 /// assert_eq!(q.capacity(), 100); 353 /// ``` capacity(&self) -> usize354 pub fn capacity(&self) -> usize { 355 self.cap 356 } 357 358 /// Returns `true` if the queue is empty. 359 /// 360 /// # Examples 361 /// 362 /// ``` 363 /// use crossbeam_queue::ArrayQueue; 364 /// 365 /// let q = ArrayQueue::new(100); 366 /// 367 /// assert!(q.is_empty()); 368 /// q.push(1).unwrap(); 369 /// assert!(!q.is_empty()); 370 /// ``` is_empty(&self) -> bool371 pub fn is_empty(&self) -> bool { 372 let head = self.head.load(Ordering::SeqCst); 373 let tail = self.tail.load(Ordering::SeqCst); 374 375 // Is the tail lagging one lap behind head? 376 // Is the tail equal to the head? 377 // 378 // Note: If the head changes just before we load the tail, that means there was a moment 379 // when the channel was not empty, so it is safe to just return `false`. 380 tail == head 381 } 382 383 /// Returns `true` if the queue is full. 384 /// 385 /// # Examples 386 /// 387 /// ``` 388 /// use crossbeam_queue::ArrayQueue; 389 /// 390 /// let q = ArrayQueue::new(1); 391 /// 392 /// assert!(!q.is_full()); 393 /// q.push(1).unwrap(); 394 /// assert!(q.is_full()); 395 /// ``` is_full(&self) -> bool396 pub fn is_full(&self) -> bool { 397 let tail = self.tail.load(Ordering::SeqCst); 398 let head = self.head.load(Ordering::SeqCst); 399 400 // Is the head lagging one lap behind tail? 401 // 402 // Note: If the tail changes just before we load the head, that means there was a moment 403 // when the queue was not full, so it is safe to just return `false`. 404 head.wrapping_add(self.one_lap) == tail 405 } 406 407 /// Returns the number of elements in the queue. 408 /// 409 /// # Examples 410 /// 411 /// ``` 412 /// use crossbeam_queue::ArrayQueue; 413 /// 414 /// let q = ArrayQueue::new(100); 415 /// assert_eq!(q.len(), 0); 416 /// 417 /// q.push(10).unwrap(); 418 /// assert_eq!(q.len(), 1); 419 /// 420 /// q.push(20).unwrap(); 421 /// assert_eq!(q.len(), 2); 422 /// ``` len(&self) -> usize423 pub fn len(&self) -> usize { 424 loop { 425 // Load the tail, then load the head. 426 let tail = self.tail.load(Ordering::SeqCst); 427 let head = self.head.load(Ordering::SeqCst); 428 429 // If the tail didn't change, we've got consistent values to work with. 430 if self.tail.load(Ordering::SeqCst) == tail { 431 let hix = head & (self.one_lap - 1); 432 let tix = tail & (self.one_lap - 1); 433 434 return if hix < tix { 435 tix - hix 436 } else if hix > tix { 437 self.cap - hix + tix 438 } else if tail == head { 439 0 440 } else { 441 self.cap 442 }; 443 } 444 } 445 } 446 } 447 448 impl<T> Drop for ArrayQueue<T> { drop(&mut self)449 fn drop(&mut self) { 450 if mem::needs_drop::<T>() { 451 // Get the index of the head. 452 let head = *self.head.get_mut(); 453 let tail = *self.tail.get_mut(); 454 455 let hix = head & (self.one_lap - 1); 456 let tix = tail & (self.one_lap - 1); 457 458 let len = if hix < tix { 459 tix - hix 460 } else if hix > tix { 461 self.cap - hix + tix 462 } else if tail == head { 463 0 464 } else { 465 self.cap 466 }; 467 468 // Loop over all slots that hold a message and drop them. 469 for i in 0..len { 470 // Compute the index of the next slot holding a message. 471 let index = if hix + i < self.cap { 472 hix + i 473 } else { 474 hix + i - self.cap 475 }; 476 477 unsafe { 478 debug_assert!(index < self.buffer.len()); 479 let slot = self.buffer.get_unchecked_mut(index); 480 (*slot.value.get()).assume_init_drop(); 481 } 482 } 483 } 484 } 485 } 486 487 impl<T> fmt::Debug for ArrayQueue<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result488 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 489 f.pad("ArrayQueue { .. }") 490 } 491 } 492 493 impl<T> IntoIterator for ArrayQueue<T> { 494 type Item = T; 495 496 type IntoIter = IntoIter<T>; 497 into_iter(self) -> Self::IntoIter498 fn into_iter(self) -> Self::IntoIter { 499 IntoIter { value: self } 500 } 501 } 502 503 #[derive(Debug)] 504 pub struct IntoIter<T> { 505 value: ArrayQueue<T>, 506 } 507 508 impl<T> Iterator for IntoIter<T> { 509 type Item = T; 510 next(&mut self) -> Option<Self::Item>511 fn next(&mut self) -> Option<Self::Item> { 512 let value = &mut self.value; 513 let head = *value.head.get_mut(); 514 if value.head.get_mut() != value.tail.get_mut() { 515 let index = head & (value.one_lap - 1); 516 let lap = head & !(value.one_lap - 1); 517 // SAFETY: We have mutable access to this, so we can read without 518 // worrying about concurrency. Furthermore, we know this is 519 // initialized because it is the value pointed at by `value.head` 520 // and this is a non-empty queue. 521 let val = unsafe { 522 debug_assert!(index < value.buffer.len()); 523 let slot = value.buffer.get_unchecked_mut(index); 524 slot.value.get().read().assume_init() 525 }; 526 let new = if index + 1 < value.cap { 527 // Same lap, incremented index. 528 // Set to `{ lap: lap, index: index + 1 }`. 529 head + 1 530 } else { 531 // One lap forward, index wraps around to zero. 532 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. 533 lap.wrapping_add(value.one_lap) 534 }; 535 *value.head.get_mut() = new; 536 Option::Some(val) 537 } else { 538 Option::None 539 } 540 } 541 } 542