1 use std::cell::{Cell, UnsafeCell}; 2 use std::cmp; 3 use std::fmt; 4 use std::marker::PhantomData; 5 use std::mem::{self, MaybeUninit}; 6 use std::ptr; 7 use std::slice; 8 use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; 9 use std::sync::Arc; 10 11 use crossbeam_epoch::{self as epoch, Atomic, Owned}; 12 use crossbeam_utils::{Backoff, CachePadded}; 13 14 // Minimum buffer capacity. 15 const MIN_CAP: usize = 64; 16 // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`. 17 const MAX_BATCH: usize = 32; 18 // If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets 19 // deallocated as soon as possible. 20 const FLUSH_THRESHOLD_BYTES: usize = 1 << 10; 21 22 /// A buffer that holds tasks in a worker queue. 23 /// 24 /// This is just a pointer to the buffer and its length - dropping an instance of this struct will 25 /// *not* deallocate the buffer. 26 struct Buffer<T> { 27 /// Pointer to the allocated memory. 28 ptr: *mut T, 29 30 /// Capacity of the buffer. Always a power of two. 31 cap: usize, 32 } 33 34 unsafe impl<T> Send for Buffer<T> {} 35 36 impl<T> Buffer<T> { 37 /// Allocates a new buffer with the specified capacity. alloc(cap: usize) -> Buffer<T>38 fn alloc(cap: usize) -> Buffer<T> { 39 debug_assert_eq!(cap, cap.next_power_of_two()); 40 41 let ptr = Box::into_raw( 42 (0..cap) 43 .map(|_| MaybeUninit::<T>::uninit()) 44 .collect::<Box<[_]>>(), 45 ) 46 .cast::<T>(); 47 48 Buffer { ptr, cap } 49 } 50 51 /// Deallocates the buffer. dealloc(self)52 unsafe fn dealloc(self) { 53 drop(Box::from_raw(slice::from_raw_parts_mut( 54 self.ptr.cast::<MaybeUninit<T>>(), 55 self.cap, 56 ))); 57 } 58 59 /// Returns a pointer to the task at the specified `index`. at(&self, index: isize) -> *mut T60 unsafe fn at(&self, index: isize) -> *mut T { 61 // `self.cap` is always a power of two. 62 // We do all the loads at `MaybeUninit` because we might realize, after loading, that we 63 // don't actually have the right to access this memory. 64 self.ptr.offset(index & (self.cap - 1) as isize) 65 } 66 67 /// Writes `task` into the specified `index`. 68 /// 69 /// This method might be concurrently called with another `read` at the same index, which is 70 /// technically speaking a data race and therefore UB. We should use an atomic store here, but 71 /// that would be more expensive and difficult to implement generically for all types `T`. 72 /// Hence, as a hack, we use a volatile write instead. write(&self, index: isize, task: MaybeUninit<T>)73 unsafe fn write(&self, index: isize, task: MaybeUninit<T>) { 74 ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task) 75 } 76 77 /// Reads a task from the specified `index`. 78 /// 79 /// This method might be concurrently called with another `write` at the same index, which is 80 /// technically speaking a data race and therefore UB. We should use an atomic load here, but 81 /// that would be more expensive and difficult to implement generically for all types `T`. 82 /// Hence, as a hack, we use a volatile load instead. read(&self, index: isize) -> MaybeUninit<T>83 unsafe fn read(&self, index: isize) -> MaybeUninit<T> { 84 ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>()) 85 } 86 } 87 88 impl<T> Clone for Buffer<T> { clone(&self) -> Buffer<T>89 fn clone(&self) -> Buffer<T> { 90 *self 91 } 92 } 93 94 impl<T> Copy for Buffer<T> {} 95 96 /// Internal queue data shared between the worker and stealers. 97 /// 98 /// The implementation is based on the following work: 99 /// 100 /// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev] 101 /// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models. 102 /// PPoPP 2013.][weak-mem] 103 /// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++ 104 /// atomics. OOPSLA 2013.][checker] 105 /// 106 /// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974 107 /// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524 108 /// [checker]: https://dl.acm.org/citation.cfm?id=2509514 109 struct Inner<T> { 110 /// The front index. 111 front: AtomicIsize, 112 113 /// The back index. 114 back: AtomicIsize, 115 116 /// The underlying buffer. 117 buffer: CachePadded<Atomic<Buffer<T>>>, 118 } 119 120 impl<T> Drop for Inner<T> { drop(&mut self)121 fn drop(&mut self) { 122 // Load the back index, front index, and buffer. 123 let b = *self.back.get_mut(); 124 let f = *self.front.get_mut(); 125 126 unsafe { 127 let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected()); 128 129 // Go through the buffer from front to back and drop all tasks in the queue. 130 let mut i = f; 131 while i != b { 132 buffer.deref().at(i).drop_in_place(); 133 i = i.wrapping_add(1); 134 } 135 136 // Free the memory allocated by the buffer. 137 buffer.into_owned().into_box().dealloc(); 138 } 139 } 140 } 141 142 /// Worker queue flavor: FIFO or LIFO. 143 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 144 enum Flavor { 145 /// The first-in first-out flavor. 146 Fifo, 147 148 /// The last-in first-out flavor. 149 Lifo, 150 } 151 152 /// A worker queue. 153 /// 154 /// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal 155 /// tasks from it. Task schedulers typically create a single worker queue per thread. 156 /// 157 /// # Examples 158 /// 159 /// A FIFO worker: 160 /// 161 /// ``` 162 /// use crossbeam_deque::{Steal, Worker}; 163 /// 164 /// let w = Worker::new_fifo(); 165 /// let s = w.stealer(); 166 /// 167 /// w.push(1); 168 /// w.push(2); 169 /// w.push(3); 170 /// 171 /// assert_eq!(s.steal(), Steal::Success(1)); 172 /// assert_eq!(w.pop(), Some(2)); 173 /// assert_eq!(w.pop(), Some(3)); 174 /// ``` 175 /// 176 /// A LIFO worker: 177 /// 178 /// ``` 179 /// use crossbeam_deque::{Steal, Worker}; 180 /// 181 /// let w = Worker::new_lifo(); 182 /// let s = w.stealer(); 183 /// 184 /// w.push(1); 185 /// w.push(2); 186 /// w.push(3); 187 /// 188 /// assert_eq!(s.steal(), Steal::Success(1)); 189 /// assert_eq!(w.pop(), Some(3)); 190 /// assert_eq!(w.pop(), Some(2)); 191 /// ``` 192 pub struct Worker<T> { 193 /// A reference to the inner representation of the queue. 194 inner: Arc<CachePadded<Inner<T>>>, 195 196 /// A copy of `inner.buffer` for quick access. 197 buffer: Cell<Buffer<T>>, 198 199 /// The flavor of the queue. 200 flavor: Flavor, 201 202 /// Indicates that the worker cannot be shared among threads. 203 _marker: PhantomData<*mut ()>, // !Send + !Sync 204 } 205 206 unsafe impl<T: Send> Send for Worker<T> {} 207 208 impl<T> Worker<T> { 209 /// Creates a FIFO worker queue. 210 /// 211 /// Tasks are pushed and popped from opposite ends. 212 /// 213 /// # Examples 214 /// 215 /// ``` 216 /// use crossbeam_deque::Worker; 217 /// 218 /// let w = Worker::<i32>::new_fifo(); 219 /// ``` new_fifo() -> Worker<T>220 pub fn new_fifo() -> Worker<T> { 221 let buffer = Buffer::alloc(MIN_CAP); 222 223 let inner = Arc::new(CachePadded::new(Inner { 224 front: AtomicIsize::new(0), 225 back: AtomicIsize::new(0), 226 buffer: CachePadded::new(Atomic::new(buffer)), 227 })); 228 229 Worker { 230 inner, 231 buffer: Cell::new(buffer), 232 flavor: Flavor::Fifo, 233 _marker: PhantomData, 234 } 235 } 236 237 /// Creates a LIFO worker queue. 238 /// 239 /// Tasks are pushed and popped from the same end. 240 /// 241 /// # Examples 242 /// 243 /// ``` 244 /// use crossbeam_deque::Worker; 245 /// 246 /// let w = Worker::<i32>::new_lifo(); 247 /// ``` new_lifo() -> Worker<T>248 pub fn new_lifo() -> Worker<T> { 249 let buffer = Buffer::alloc(MIN_CAP); 250 251 let inner = Arc::new(CachePadded::new(Inner { 252 front: AtomicIsize::new(0), 253 back: AtomicIsize::new(0), 254 buffer: CachePadded::new(Atomic::new(buffer)), 255 })); 256 257 Worker { 258 inner, 259 buffer: Cell::new(buffer), 260 flavor: Flavor::Lifo, 261 _marker: PhantomData, 262 } 263 } 264 265 /// Creates a stealer for this queue. 266 /// 267 /// The returned stealer can be shared among threads and cloned. 268 /// 269 /// # Examples 270 /// 271 /// ``` 272 /// use crossbeam_deque::Worker; 273 /// 274 /// let w = Worker::<i32>::new_lifo(); 275 /// let s = w.stealer(); 276 /// ``` stealer(&self) -> Stealer<T>277 pub fn stealer(&self) -> Stealer<T> { 278 Stealer { 279 inner: self.inner.clone(), 280 flavor: self.flavor, 281 } 282 } 283 284 /// Resizes the internal buffer to the new capacity of `new_cap`. 285 #[cold] resize(&self, new_cap: usize)286 unsafe fn resize(&self, new_cap: usize) { 287 // Load the back index, front index, and buffer. 288 let b = self.inner.back.load(Ordering::Relaxed); 289 let f = self.inner.front.load(Ordering::Relaxed); 290 let buffer = self.buffer.get(); 291 292 // Allocate a new buffer and copy data from the old buffer to the new one. 293 let new = Buffer::alloc(new_cap); 294 let mut i = f; 295 while i != b { 296 ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1); 297 i = i.wrapping_add(1); 298 } 299 300 let guard = &epoch::pin(); 301 302 // Replace the old buffer with the new one. 303 self.buffer.replace(new); 304 let old = 305 self.inner 306 .buffer 307 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard); 308 309 // Destroy the old buffer later. 310 guard.defer_unchecked(move || old.into_owned().into_box().dealloc()); 311 312 // If the buffer is very large, then flush the thread-local garbage in order to deallocate 313 // it as soon as possible. 314 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES { 315 guard.flush(); 316 } 317 } 318 319 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the 320 /// buffer. reserve(&self, reserve_cap: usize)321 fn reserve(&self, reserve_cap: usize) { 322 if reserve_cap > 0 { 323 // Compute the current length. 324 let b = self.inner.back.load(Ordering::Relaxed); 325 let f = self.inner.front.load(Ordering::SeqCst); 326 let len = b.wrapping_sub(f) as usize; 327 328 // The current capacity. 329 let cap = self.buffer.get().cap; 330 331 // Is there enough capacity to push `reserve_cap` tasks? 332 if cap - len < reserve_cap { 333 // Keep doubling the capacity as much as is needed. 334 let mut new_cap = cap * 2; 335 while new_cap - len < reserve_cap { 336 new_cap *= 2; 337 } 338 339 // Resize the buffer. 340 unsafe { 341 self.resize(new_cap); 342 } 343 } 344 } 345 } 346 347 /// Returns `true` if the queue is empty. 348 /// 349 /// ``` 350 /// use crossbeam_deque::Worker; 351 /// 352 /// let w = Worker::new_lifo(); 353 /// 354 /// assert!(w.is_empty()); 355 /// w.push(1); 356 /// assert!(!w.is_empty()); 357 /// ``` is_empty(&self) -> bool358 pub fn is_empty(&self) -> bool { 359 let b = self.inner.back.load(Ordering::Relaxed); 360 let f = self.inner.front.load(Ordering::SeqCst); 361 b.wrapping_sub(f) <= 0 362 } 363 364 /// Returns the number of tasks in the deque. 365 /// 366 /// ``` 367 /// use crossbeam_deque::Worker; 368 /// 369 /// let w = Worker::new_lifo(); 370 /// 371 /// assert_eq!(w.len(), 0); 372 /// w.push(1); 373 /// assert_eq!(w.len(), 1); 374 /// w.push(1); 375 /// assert_eq!(w.len(), 2); 376 /// ``` len(&self) -> usize377 pub fn len(&self) -> usize { 378 let b = self.inner.back.load(Ordering::Relaxed); 379 let f = self.inner.front.load(Ordering::SeqCst); 380 b.wrapping_sub(f).max(0) as usize 381 } 382 383 /// Pushes a task into the queue. 384 /// 385 /// # Examples 386 /// 387 /// ``` 388 /// use crossbeam_deque::Worker; 389 /// 390 /// let w = Worker::new_lifo(); 391 /// w.push(1); 392 /// w.push(2); 393 /// ``` push(&self, task: T)394 pub fn push(&self, task: T) { 395 // Load the back index, front index, and buffer. 396 let b = self.inner.back.load(Ordering::Relaxed); 397 let f = self.inner.front.load(Ordering::Acquire); 398 let mut buffer = self.buffer.get(); 399 400 // Calculate the length of the queue. 401 let len = b.wrapping_sub(f); 402 403 // Is the queue full? 404 if len >= buffer.cap as isize { 405 // Yes. Grow the underlying buffer. 406 unsafe { 407 self.resize(2 * buffer.cap); 408 } 409 buffer = self.buffer.get(); 410 } 411 412 // Write `task` into the slot. 413 unsafe { 414 buffer.write(b, MaybeUninit::new(task)); 415 } 416 417 atomic::fence(Ordering::Release); 418 419 // Increment the back index. 420 // 421 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 422 // races because it doesn't understand fences. 423 self.inner.back.store(b.wrapping_add(1), Ordering::Release); 424 } 425 426 /// Pops a task from the queue. 427 /// 428 /// # Examples 429 /// 430 /// ``` 431 /// use crossbeam_deque::Worker; 432 /// 433 /// let w = Worker::new_fifo(); 434 /// w.push(1); 435 /// w.push(2); 436 /// 437 /// assert_eq!(w.pop(), Some(1)); 438 /// assert_eq!(w.pop(), Some(2)); 439 /// assert_eq!(w.pop(), None); 440 /// ``` pop(&self) -> Option<T>441 pub fn pop(&self) -> Option<T> { 442 // Load the back and front index. 443 let b = self.inner.back.load(Ordering::Relaxed); 444 let f = self.inner.front.load(Ordering::Relaxed); 445 446 // Calculate the length of the queue. 447 let len = b.wrapping_sub(f); 448 449 // Is the queue empty? 450 if len <= 0 { 451 return None; 452 } 453 454 match self.flavor { 455 // Pop from the front of the queue. 456 Flavor::Fifo => { 457 // Try incrementing the front index to pop the task. 458 let f = self.inner.front.fetch_add(1, Ordering::SeqCst); 459 let new_f = f.wrapping_add(1); 460 461 if b.wrapping_sub(new_f) < 0 { 462 self.inner.front.store(f, Ordering::Relaxed); 463 return None; 464 } 465 466 unsafe { 467 // Read the popped task. 468 let buffer = self.buffer.get(); 469 let task = buffer.read(f).assume_init(); 470 471 // Shrink the buffer if `len - 1` is less than one fourth of the capacity. 472 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 { 473 self.resize(buffer.cap / 2); 474 } 475 476 Some(task) 477 } 478 } 479 480 // Pop from the back of the queue. 481 Flavor::Lifo => { 482 // Decrement the back index. 483 let b = b.wrapping_sub(1); 484 self.inner.back.store(b, Ordering::Relaxed); 485 486 atomic::fence(Ordering::SeqCst); 487 488 // Load the front index. 489 let f = self.inner.front.load(Ordering::Relaxed); 490 491 // Compute the length after the back index was decremented. 492 let len = b.wrapping_sub(f); 493 494 if len < 0 { 495 // The queue is empty. Restore the back index to the original task. 496 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); 497 None 498 } else { 499 // Read the task to be popped. 500 let buffer = self.buffer.get(); 501 let mut task = unsafe { Some(buffer.read(b)) }; 502 503 // Are we popping the last task from the queue? 504 if len == 0 { 505 // Try incrementing the front index. 506 if self 507 .inner 508 .front 509 .compare_exchange( 510 f, 511 f.wrapping_add(1), 512 Ordering::SeqCst, 513 Ordering::Relaxed, 514 ) 515 .is_err() 516 { 517 // Failed. We didn't pop anything. Reset to `None`. 518 task.take(); 519 } 520 521 // Restore the back index to the original task. 522 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); 523 } else { 524 // Shrink the buffer if `len` is less than one fourth of the capacity. 525 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 { 526 unsafe { 527 self.resize(buffer.cap / 2); 528 } 529 } 530 } 531 532 task.map(|t| unsafe { t.assume_init() }) 533 } 534 } 535 } 536 } 537 } 538 539 impl<T> fmt::Debug for Worker<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result540 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 541 f.pad("Worker { .. }") 542 } 543 } 544 545 /// A stealer handle of a worker queue. 546 /// 547 /// Stealers can be shared among threads. 548 /// 549 /// Task schedulers typically have a single worker queue per worker thread. 550 /// 551 /// # Examples 552 /// 553 /// ``` 554 /// use crossbeam_deque::{Steal, Worker}; 555 /// 556 /// let w = Worker::new_lifo(); 557 /// w.push(1); 558 /// w.push(2); 559 /// 560 /// let s = w.stealer(); 561 /// assert_eq!(s.steal(), Steal::Success(1)); 562 /// assert_eq!(s.steal(), Steal::Success(2)); 563 /// assert_eq!(s.steal(), Steal::Empty); 564 /// ``` 565 pub struct Stealer<T> { 566 /// A reference to the inner representation of the queue. 567 inner: Arc<CachePadded<Inner<T>>>, 568 569 /// The flavor of the queue. 570 flavor: Flavor, 571 } 572 573 unsafe impl<T: Send> Send for Stealer<T> {} 574 unsafe impl<T: Send> Sync for Stealer<T> {} 575 576 impl<T> Stealer<T> { 577 /// Returns `true` if the queue is empty. 578 /// 579 /// ``` 580 /// use crossbeam_deque::Worker; 581 /// 582 /// let w = Worker::new_lifo(); 583 /// let s = w.stealer(); 584 /// 585 /// assert!(s.is_empty()); 586 /// w.push(1); 587 /// assert!(!s.is_empty()); 588 /// ``` is_empty(&self) -> bool589 pub fn is_empty(&self) -> bool { 590 let f = self.inner.front.load(Ordering::Acquire); 591 atomic::fence(Ordering::SeqCst); 592 let b = self.inner.back.load(Ordering::Acquire); 593 b.wrapping_sub(f) <= 0 594 } 595 596 /// Returns the number of tasks in the deque. 597 /// 598 /// ``` 599 /// use crossbeam_deque::Worker; 600 /// 601 /// let w = Worker::new_lifo(); 602 /// let s = w.stealer(); 603 /// 604 /// assert_eq!(s.len(), 0); 605 /// w.push(1); 606 /// assert_eq!(s.len(), 1); 607 /// w.push(2); 608 /// assert_eq!(s.len(), 2); 609 /// ``` len(&self) -> usize610 pub fn len(&self) -> usize { 611 let f = self.inner.front.load(Ordering::Acquire); 612 atomic::fence(Ordering::SeqCst); 613 let b = self.inner.back.load(Ordering::Acquire); 614 b.wrapping_sub(f).max(0) as usize 615 } 616 617 /// Steals a task from the queue. 618 /// 619 /// # Examples 620 /// 621 /// ``` 622 /// use crossbeam_deque::{Steal, Worker}; 623 /// 624 /// let w = Worker::new_lifo(); 625 /// w.push(1); 626 /// w.push(2); 627 /// 628 /// let s = w.stealer(); 629 /// assert_eq!(s.steal(), Steal::Success(1)); 630 /// assert_eq!(s.steal(), Steal::Success(2)); 631 /// ``` steal(&self) -> Steal<T>632 pub fn steal(&self) -> Steal<T> { 633 // Load the front index. 634 let f = self.inner.front.load(Ordering::Acquire); 635 636 // A SeqCst fence is needed here. 637 // 638 // If the current thread is already pinned (reentrantly), we must manually issue the 639 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 640 // have to. 641 if epoch::is_pinned() { 642 atomic::fence(Ordering::SeqCst); 643 } 644 645 let guard = &epoch::pin(); 646 647 // Load the back index. 648 let b = self.inner.back.load(Ordering::Acquire); 649 650 // Is the queue empty? 651 if b.wrapping_sub(f) <= 0 { 652 return Steal::Empty; 653 } 654 655 // Load the buffer and read the task at the front. 656 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 657 let task = unsafe { buffer.deref().read(f) }; 658 659 // Try incrementing the front index to steal the task. 660 // If the buffer has been swapped or the increment fails, we retry. 661 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 662 || self 663 .inner 664 .front 665 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 666 .is_err() 667 { 668 // We didn't steal this task, forget it. 669 return Steal::Retry; 670 } 671 672 // Return the stolen task. 673 Steal::Success(unsafe { task.assume_init() }) 674 } 675 676 /// Steals a batch of tasks and pushes them into another worker. 677 /// 678 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 679 /// steal around half of the tasks in the queue, but also not more than some constant limit. 680 /// 681 /// # Examples 682 /// 683 /// ``` 684 /// use crossbeam_deque::Worker; 685 /// 686 /// let w1 = Worker::new_fifo(); 687 /// w1.push(1); 688 /// w1.push(2); 689 /// w1.push(3); 690 /// w1.push(4); 691 /// 692 /// let s = w1.stealer(); 693 /// let w2 = Worker::new_fifo(); 694 /// 695 /// let _ = s.steal_batch(&w2); 696 /// assert_eq!(w2.pop(), Some(1)); 697 /// assert_eq!(w2.pop(), Some(2)); 698 /// ``` steal_batch(&self, dest: &Worker<T>) -> Steal<()>699 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { 700 self.steal_batch_with_limit(dest, MAX_BATCH) 701 } 702 703 /// Steals no more than `limit` of tasks and pushes them into another worker. 704 /// 705 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 706 /// steal around half of the tasks in the queue, but also not more than the given limit. 707 /// 708 /// # Examples 709 /// 710 /// ``` 711 /// use crossbeam_deque::Worker; 712 /// 713 /// let w1 = Worker::new_fifo(); 714 /// w1.push(1); 715 /// w1.push(2); 716 /// w1.push(3); 717 /// w1.push(4); 718 /// w1.push(5); 719 /// w1.push(6); 720 /// 721 /// let s = w1.stealer(); 722 /// let w2 = Worker::new_fifo(); 723 /// 724 /// let _ = s.steal_batch_with_limit(&w2, 2); 725 /// assert_eq!(w2.pop(), Some(1)); 726 /// assert_eq!(w2.pop(), Some(2)); 727 /// assert_eq!(w2.pop(), None); 728 /// 729 /// w1.push(7); 730 /// w1.push(8); 731 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 732 /// // half of the elements are currently popped, but the number of popped elements is considered 733 /// // an implementation detail that may be changed in the future. 734 /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX); 735 /// assert_eq!(w2.len(), 3); 736 /// ``` steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>737 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> { 738 assert!(limit > 0); 739 if Arc::ptr_eq(&self.inner, &dest.inner) { 740 if dest.is_empty() { 741 return Steal::Empty; 742 } else { 743 return Steal::Success(()); 744 } 745 } 746 747 // Load the front index. 748 let mut f = self.inner.front.load(Ordering::Acquire); 749 750 // A SeqCst fence is needed here. 751 // 752 // If the current thread is already pinned (reentrantly), we must manually issue the 753 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 754 // have to. 755 if epoch::is_pinned() { 756 atomic::fence(Ordering::SeqCst); 757 } 758 759 let guard = &epoch::pin(); 760 761 // Load the back index. 762 let b = self.inner.back.load(Ordering::Acquire); 763 764 // Is the queue empty? 765 let len = b.wrapping_sub(f); 766 if len <= 0 { 767 return Steal::Empty; 768 } 769 770 // Reserve capacity for the stolen batch. 771 let batch_size = cmp::min((len as usize + 1) / 2, limit); 772 dest.reserve(batch_size); 773 let mut batch_size = batch_size as isize; 774 775 // Get the destination buffer and back index. 776 let dest_buffer = dest.buffer.get(); 777 let mut dest_b = dest.inner.back.load(Ordering::Relaxed); 778 779 // Load the buffer. 780 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 781 782 match self.flavor { 783 // Steal a batch of tasks from the front at once. 784 Flavor::Fifo => { 785 // Copy the batch from the source to the destination buffer. 786 match dest.flavor { 787 Flavor::Fifo => { 788 for i in 0..batch_size { 789 unsafe { 790 let task = buffer.deref().read(f.wrapping_add(i)); 791 dest_buffer.write(dest_b.wrapping_add(i), task); 792 } 793 } 794 } 795 Flavor::Lifo => { 796 for i in 0..batch_size { 797 unsafe { 798 let task = buffer.deref().read(f.wrapping_add(i)); 799 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); 800 } 801 } 802 } 803 } 804 805 // Try incrementing the front index to steal the batch. 806 // If the buffer has been swapped or the increment fails, we retry. 807 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 808 || self 809 .inner 810 .front 811 .compare_exchange( 812 f, 813 f.wrapping_add(batch_size), 814 Ordering::SeqCst, 815 Ordering::Relaxed, 816 ) 817 .is_err() 818 { 819 return Steal::Retry; 820 } 821 822 dest_b = dest_b.wrapping_add(batch_size); 823 } 824 825 // Steal a batch of tasks from the front one by one. 826 Flavor::Lifo => { 827 // This loop may modify the batch_size, which triggers a clippy lint warning. 828 // Use a new variable to avoid the warning, and to make it clear we aren't 829 // modifying the loop exit condition during iteration. 830 let original_batch_size = batch_size; 831 832 for i in 0..original_batch_size { 833 // If this is not the first steal, check whether the queue is empty. 834 if i > 0 { 835 // We've already got the current front index. Now execute the fence to 836 // synchronize with other threads. 837 atomic::fence(Ordering::SeqCst); 838 839 // Load the back index. 840 let b = self.inner.back.load(Ordering::Acquire); 841 842 // Is the queue empty? 843 if b.wrapping_sub(f) <= 0 { 844 batch_size = i; 845 break; 846 } 847 } 848 849 // Read the task at the front. 850 let task = unsafe { buffer.deref().read(f) }; 851 852 // Try incrementing the front index to steal the task. 853 // If the buffer has been swapped or the increment fails, we retry. 854 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 855 || self 856 .inner 857 .front 858 .compare_exchange( 859 f, 860 f.wrapping_add(1), 861 Ordering::SeqCst, 862 Ordering::Relaxed, 863 ) 864 .is_err() 865 { 866 // We didn't steal this task, forget it and break from the loop. 867 batch_size = i; 868 break; 869 } 870 871 // Write the stolen task into the destination buffer. 872 unsafe { 873 dest_buffer.write(dest_b, task); 874 } 875 876 // Move the source front index and the destination back index one step forward. 877 f = f.wrapping_add(1); 878 dest_b = dest_b.wrapping_add(1); 879 } 880 881 // If we didn't steal anything, the operation needs to be retried. 882 if batch_size == 0 { 883 return Steal::Retry; 884 } 885 886 // If stealing into a FIFO queue, stolen tasks need to be reversed. 887 if dest.flavor == Flavor::Fifo { 888 for i in 0..batch_size / 2 { 889 unsafe { 890 let i1 = dest_b.wrapping_sub(batch_size - i); 891 let i2 = dest_b.wrapping_sub(i + 1); 892 let t1 = dest_buffer.read(i1); 893 let t2 = dest_buffer.read(i2); 894 dest_buffer.write(i1, t2); 895 dest_buffer.write(i2, t1); 896 } 897 } 898 } 899 } 900 } 901 902 atomic::fence(Ordering::Release); 903 904 // Update the back index in the destination queue. 905 // 906 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 907 // races because it doesn't understand fences. 908 dest.inner.back.store(dest_b, Ordering::Release); 909 910 // Return with success. 911 Steal::Success(()) 912 } 913 914 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker. 915 /// 916 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 917 /// steal around half of the tasks in the queue, but also not more than some constant limit. 918 /// 919 /// # Examples 920 /// 921 /// ``` 922 /// use crossbeam_deque::{Steal, Worker}; 923 /// 924 /// let w1 = Worker::new_fifo(); 925 /// w1.push(1); 926 /// w1.push(2); 927 /// w1.push(3); 928 /// w1.push(4); 929 /// 930 /// let s = w1.stealer(); 931 /// let w2 = Worker::new_fifo(); 932 /// 933 /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1)); 934 /// assert_eq!(w2.pop(), Some(2)); 935 /// ``` steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>936 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { 937 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH) 938 } 939 940 /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from 941 /// that worker. 942 /// 943 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 944 /// steal around half of the tasks in the queue, but also not more than the given limit. 945 /// 946 /// # Examples 947 /// 948 /// ``` 949 /// use crossbeam_deque::{Steal, Worker}; 950 /// 951 /// let w1 = Worker::new_fifo(); 952 /// w1.push(1); 953 /// w1.push(2); 954 /// w1.push(3); 955 /// w1.push(4); 956 /// w1.push(5); 957 /// w1.push(6); 958 /// 959 /// let s = w1.stealer(); 960 /// let w2 = Worker::new_fifo(); 961 /// 962 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1)); 963 /// assert_eq!(w2.pop(), Some(2)); 964 /// assert_eq!(w2.pop(), None); 965 /// 966 /// w1.push(7); 967 /// w1.push(8); 968 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 969 /// // half of the elements are currently popped, but the number of popped elements is considered 970 /// // an implementation detail that may be changed in the future. 971 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3)); 972 /// assert_eq!(w2.pop(), Some(4)); 973 /// assert_eq!(w2.pop(), Some(5)); 974 /// assert_eq!(w2.pop(), None); 975 /// ``` steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>976 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> { 977 assert!(limit > 0); 978 if Arc::ptr_eq(&self.inner, &dest.inner) { 979 match dest.pop() { 980 None => return Steal::Empty, 981 Some(task) => return Steal::Success(task), 982 } 983 } 984 985 // Load the front index. 986 let mut f = self.inner.front.load(Ordering::Acquire); 987 988 // A SeqCst fence is needed here. 989 // 990 // If the current thread is already pinned (reentrantly), we must manually issue the 991 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't 992 // have to. 993 if epoch::is_pinned() { 994 atomic::fence(Ordering::SeqCst); 995 } 996 997 let guard = &epoch::pin(); 998 999 // Load the back index. 1000 let b = self.inner.back.load(Ordering::Acquire); 1001 1002 // Is the queue empty? 1003 let len = b.wrapping_sub(f); 1004 if len <= 0 { 1005 return Steal::Empty; 1006 } 1007 1008 // Reserve capacity for the stolen batch. 1009 let batch_size = cmp::min((len as usize - 1) / 2, limit - 1); 1010 dest.reserve(batch_size); 1011 let mut batch_size = batch_size as isize; 1012 1013 // Get the destination buffer and back index. 1014 let dest_buffer = dest.buffer.get(); 1015 let mut dest_b = dest.inner.back.load(Ordering::Relaxed); 1016 1017 // Load the buffer 1018 let buffer = self.inner.buffer.load(Ordering::Acquire, guard); 1019 1020 // Read the task at the front. 1021 let mut task = unsafe { buffer.deref().read(f) }; 1022 1023 match self.flavor { 1024 // Steal a batch of tasks from the front at once. 1025 Flavor::Fifo => { 1026 // Copy the batch from the source to the destination buffer. 1027 match dest.flavor { 1028 Flavor::Fifo => { 1029 for i in 0..batch_size { 1030 unsafe { 1031 let task = buffer.deref().read(f.wrapping_add(i + 1)); 1032 dest_buffer.write(dest_b.wrapping_add(i), task); 1033 } 1034 } 1035 } 1036 Flavor::Lifo => { 1037 for i in 0..batch_size { 1038 unsafe { 1039 let task = buffer.deref().read(f.wrapping_add(i + 1)); 1040 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); 1041 } 1042 } 1043 } 1044 } 1045 1046 // Try incrementing the front index to steal the task. 1047 // If the buffer has been swapped or the increment fails, we retry. 1048 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 1049 || self 1050 .inner 1051 .front 1052 .compare_exchange( 1053 f, 1054 f.wrapping_add(batch_size + 1), 1055 Ordering::SeqCst, 1056 Ordering::Relaxed, 1057 ) 1058 .is_err() 1059 { 1060 // We didn't steal this task, forget it. 1061 return Steal::Retry; 1062 } 1063 1064 dest_b = dest_b.wrapping_add(batch_size); 1065 } 1066 1067 // Steal a batch of tasks from the front one by one. 1068 Flavor::Lifo => { 1069 // Try incrementing the front index to steal the task. 1070 if self 1071 .inner 1072 .front 1073 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) 1074 .is_err() 1075 { 1076 // We didn't steal this task, forget it. 1077 return Steal::Retry; 1078 } 1079 1080 // Move the front index one step forward. 1081 f = f.wrapping_add(1); 1082 1083 // Repeat the same procedure for the batch steals. 1084 // 1085 // This loop may modify the batch_size, which triggers a clippy lint warning. 1086 // Use a new variable to avoid the warning, and to make it clear we aren't 1087 // modifying the loop exit condition during iteration. 1088 let original_batch_size = batch_size; 1089 for i in 0..original_batch_size { 1090 // We've already got the current front index. Now execute the fence to 1091 // synchronize with other threads. 1092 atomic::fence(Ordering::SeqCst); 1093 1094 // Load the back index. 1095 let b = self.inner.back.load(Ordering::Acquire); 1096 1097 // Is the queue empty? 1098 if b.wrapping_sub(f) <= 0 { 1099 batch_size = i; 1100 break; 1101 } 1102 1103 // Read the task at the front. 1104 let tmp = unsafe { buffer.deref().read(f) }; 1105 1106 // Try incrementing the front index to steal the task. 1107 // If the buffer has been swapped or the increment fails, we retry. 1108 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer 1109 || self 1110 .inner 1111 .front 1112 .compare_exchange( 1113 f, 1114 f.wrapping_add(1), 1115 Ordering::SeqCst, 1116 Ordering::Relaxed, 1117 ) 1118 .is_err() 1119 { 1120 // We didn't steal this task, forget it and break from the loop. 1121 batch_size = i; 1122 break; 1123 } 1124 1125 // Write the previously stolen task into the destination buffer. 1126 unsafe { 1127 dest_buffer.write(dest_b, mem::replace(&mut task, tmp)); 1128 } 1129 1130 // Move the source front index and the destination back index one step forward. 1131 f = f.wrapping_add(1); 1132 dest_b = dest_b.wrapping_add(1); 1133 } 1134 1135 // If stealing into a FIFO queue, stolen tasks need to be reversed. 1136 if dest.flavor == Flavor::Fifo { 1137 for i in 0..batch_size / 2 { 1138 unsafe { 1139 let i1 = dest_b.wrapping_sub(batch_size - i); 1140 let i2 = dest_b.wrapping_sub(i + 1); 1141 let t1 = dest_buffer.read(i1); 1142 let t2 = dest_buffer.read(i2); 1143 dest_buffer.write(i1, t2); 1144 dest_buffer.write(i2, t1); 1145 } 1146 } 1147 } 1148 } 1149 } 1150 1151 atomic::fence(Ordering::Release); 1152 1153 // Update the back index in the destination queue. 1154 // 1155 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data 1156 // races because it doesn't understand fences. 1157 dest.inner.back.store(dest_b, Ordering::Release); 1158 1159 // Return with success. 1160 Steal::Success(unsafe { task.assume_init() }) 1161 } 1162 } 1163 1164 impl<T> Clone for Stealer<T> { clone(&self) -> Stealer<T>1165 fn clone(&self) -> Stealer<T> { 1166 Stealer { 1167 inner: self.inner.clone(), 1168 flavor: self.flavor, 1169 } 1170 } 1171 } 1172 1173 impl<T> fmt::Debug for Stealer<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1175 f.pad("Stealer { .. }") 1176 } 1177 } 1178 1179 // Bits indicating the state of a slot: 1180 // * If a task has been written into the slot, `WRITE` is set. 1181 // * If a task has been read from the slot, `READ` is set. 1182 // * If the block is being destroyed, `DESTROY` is set. 1183 const WRITE: usize = 1; 1184 const READ: usize = 2; 1185 const DESTROY: usize = 4; 1186 1187 // Each block covers one "lap" of indices. 1188 const LAP: usize = 64; 1189 // The maximum number of values a block can hold. 1190 const BLOCK_CAP: usize = LAP - 1; 1191 // How many lower bits are reserved for metadata. 1192 const SHIFT: usize = 1; 1193 // Indicates that the block is not the last one. 1194 const HAS_NEXT: usize = 1; 1195 1196 /// A slot in a block. 1197 struct Slot<T> { 1198 /// The task. 1199 task: UnsafeCell<MaybeUninit<T>>, 1200 1201 /// The state of the slot. 1202 state: AtomicUsize, 1203 } 1204 1205 impl<T> Slot<T> { 1206 const UNINIT: Self = Self { 1207 task: UnsafeCell::new(MaybeUninit::uninit()), 1208 state: AtomicUsize::new(0), 1209 }; 1210 1211 /// Waits until a task is written into the slot. wait_write(&self)1212 fn wait_write(&self) { 1213 let backoff = Backoff::new(); 1214 while self.state.load(Ordering::Acquire) & WRITE == 0 { 1215 backoff.snooze(); 1216 } 1217 } 1218 } 1219 1220 /// A block in a linked list. 1221 /// 1222 /// Each block in the list can hold up to `BLOCK_CAP` values. 1223 struct Block<T> { 1224 /// The next block in the linked list. 1225 next: AtomicPtr<Block<T>>, 1226 1227 /// Slots for values. 1228 slots: [Slot<T>; BLOCK_CAP], 1229 } 1230 1231 impl<T> Block<T> { 1232 /// Creates an empty block that starts at `start_index`. new() -> Block<T>1233 fn new() -> Block<T> { 1234 Self { 1235 next: AtomicPtr::new(ptr::null_mut()), 1236 slots: [Slot::UNINIT; BLOCK_CAP], 1237 } 1238 } 1239 1240 /// Waits until the next pointer is set. wait_next(&self) -> *mut Block<T>1241 fn wait_next(&self) -> *mut Block<T> { 1242 let backoff = Backoff::new(); 1243 loop { 1244 let next = self.next.load(Ordering::Acquire); 1245 if !next.is_null() { 1246 return next; 1247 } 1248 backoff.snooze(); 1249 } 1250 } 1251 1252 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. destroy(this: *mut Block<T>, count: usize)1253 unsafe fn destroy(this: *mut Block<T>, count: usize) { 1254 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has 1255 // begun destruction of the block. 1256 for i in (0..count).rev() { 1257 let slot = (*this).slots.get_unchecked(i); 1258 1259 // Mark the `DESTROY` bit if a thread is still using the slot. 1260 if slot.state.load(Ordering::Acquire) & READ == 0 1261 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 1262 { 1263 // If a thread is still using the slot, it will continue destruction of the block. 1264 return; 1265 } 1266 } 1267 1268 // No thread is using the block, now it is safe to destroy it. 1269 drop(Box::from_raw(this)); 1270 } 1271 } 1272 1273 /// A position in a queue. 1274 struct Position<T> { 1275 /// The index in the queue. 1276 index: AtomicUsize, 1277 1278 /// The block in the linked list. 1279 block: AtomicPtr<Block<T>>, 1280 } 1281 1282 /// An injector queue. 1283 /// 1284 /// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have 1285 /// a single injector queue, which is the entry point for new tasks. 1286 /// 1287 /// # Examples 1288 /// 1289 /// ``` 1290 /// use crossbeam_deque::{Injector, Steal}; 1291 /// 1292 /// let q = Injector::new(); 1293 /// q.push(1); 1294 /// q.push(2); 1295 /// 1296 /// assert_eq!(q.steal(), Steal::Success(1)); 1297 /// assert_eq!(q.steal(), Steal::Success(2)); 1298 /// assert_eq!(q.steal(), Steal::Empty); 1299 /// ``` 1300 pub struct Injector<T> { 1301 /// The head of the queue. 1302 head: CachePadded<Position<T>>, 1303 1304 /// The tail of the queue. 1305 tail: CachePadded<Position<T>>, 1306 1307 /// Indicates that dropping a `Injector<T>` may drop values of type `T`. 1308 _marker: PhantomData<T>, 1309 } 1310 1311 unsafe impl<T: Send> Send for Injector<T> {} 1312 unsafe impl<T: Send> Sync for Injector<T> {} 1313 1314 impl<T> Default for Injector<T> { default() -> Self1315 fn default() -> Self { 1316 let block = Box::into_raw(Box::new(Block::<T>::new())); 1317 Self { 1318 head: CachePadded::new(Position { 1319 block: AtomicPtr::new(block), 1320 index: AtomicUsize::new(0), 1321 }), 1322 tail: CachePadded::new(Position { 1323 block: AtomicPtr::new(block), 1324 index: AtomicUsize::new(0), 1325 }), 1326 _marker: PhantomData, 1327 } 1328 } 1329 } 1330 1331 impl<T> Injector<T> { 1332 /// Creates a new injector queue. 1333 /// 1334 /// # Examples 1335 /// 1336 /// ``` 1337 /// use crossbeam_deque::Injector; 1338 /// 1339 /// let q = Injector::<i32>::new(); 1340 /// ``` new() -> Injector<T>1341 pub fn new() -> Injector<T> { 1342 Self::default() 1343 } 1344 1345 /// Pushes a task into the queue. 1346 /// 1347 /// # Examples 1348 /// 1349 /// ``` 1350 /// use crossbeam_deque::Injector; 1351 /// 1352 /// let w = Injector::new(); 1353 /// w.push(1); 1354 /// w.push(2); 1355 /// ``` push(&self, task: T)1356 pub fn push(&self, task: T) { 1357 let backoff = Backoff::new(); 1358 let mut tail = self.tail.index.load(Ordering::Acquire); 1359 let mut block = self.tail.block.load(Ordering::Acquire); 1360 let mut next_block = None; 1361 1362 loop { 1363 // Calculate the offset of the index into the block. 1364 let offset = (tail >> SHIFT) % LAP; 1365 1366 // If we reached the end of the block, wait until the next one is installed. 1367 if offset == BLOCK_CAP { 1368 backoff.snooze(); 1369 tail = self.tail.index.load(Ordering::Acquire); 1370 block = self.tail.block.load(Ordering::Acquire); 1371 continue; 1372 } 1373 1374 // If we're going to have to install the next block, allocate it in advance in order to 1375 // make the wait for other threads as short as possible. 1376 if offset + 1 == BLOCK_CAP && next_block.is_none() { 1377 next_block = Some(Box::new(Block::<T>::new())); 1378 } 1379 1380 let new_tail = tail + (1 << SHIFT); 1381 1382 // Try advancing the tail forward. 1383 match self.tail.index.compare_exchange_weak( 1384 tail, 1385 new_tail, 1386 Ordering::SeqCst, 1387 Ordering::Acquire, 1388 ) { 1389 Ok(_) => unsafe { 1390 // If we've reached the end of the block, install the next one. 1391 if offset + 1 == BLOCK_CAP { 1392 let next_block = Box::into_raw(next_block.unwrap()); 1393 let next_index = new_tail.wrapping_add(1 << SHIFT); 1394 1395 self.tail.block.store(next_block, Ordering::Release); 1396 self.tail.index.store(next_index, Ordering::Release); 1397 (*block).next.store(next_block, Ordering::Release); 1398 } 1399 1400 // Write the task into the slot. 1401 let slot = (*block).slots.get_unchecked(offset); 1402 slot.task.get().write(MaybeUninit::new(task)); 1403 slot.state.fetch_or(WRITE, Ordering::Release); 1404 1405 return; 1406 }, 1407 Err(t) => { 1408 tail = t; 1409 block = self.tail.block.load(Ordering::Acquire); 1410 backoff.spin(); 1411 } 1412 } 1413 } 1414 } 1415 1416 /// Steals a task from the queue. 1417 /// 1418 /// # Examples 1419 /// 1420 /// ``` 1421 /// use crossbeam_deque::{Injector, Steal}; 1422 /// 1423 /// let q = Injector::new(); 1424 /// q.push(1); 1425 /// q.push(2); 1426 /// 1427 /// assert_eq!(q.steal(), Steal::Success(1)); 1428 /// assert_eq!(q.steal(), Steal::Success(2)); 1429 /// assert_eq!(q.steal(), Steal::Empty); 1430 /// ``` steal(&self) -> Steal<T>1431 pub fn steal(&self) -> Steal<T> { 1432 let mut head; 1433 let mut block; 1434 let mut offset; 1435 1436 let backoff = Backoff::new(); 1437 loop { 1438 head = self.head.index.load(Ordering::Acquire); 1439 block = self.head.block.load(Ordering::Acquire); 1440 1441 // Calculate the offset of the index into the block. 1442 offset = (head >> SHIFT) % LAP; 1443 1444 // If we reached the end of the block, wait until the next one is installed. 1445 if offset == BLOCK_CAP { 1446 backoff.snooze(); 1447 } else { 1448 break; 1449 } 1450 } 1451 1452 let mut new_head = head + (1 << SHIFT); 1453 1454 if new_head & HAS_NEXT == 0 { 1455 atomic::fence(Ordering::SeqCst); 1456 let tail = self.tail.index.load(Ordering::Relaxed); 1457 1458 // If the tail equals the head, that means the queue is empty. 1459 if head >> SHIFT == tail >> SHIFT { 1460 return Steal::Empty; 1461 } 1462 1463 // If head and tail are not in the same block, set `HAS_NEXT` in head. 1464 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1465 new_head |= HAS_NEXT; 1466 } 1467 } 1468 1469 // Try moving the head index forward. 1470 if self 1471 .head 1472 .index 1473 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1474 .is_err() 1475 { 1476 return Steal::Retry; 1477 } 1478 1479 unsafe { 1480 // If we've reached the end of the block, move to the next one. 1481 if offset + 1 == BLOCK_CAP { 1482 let next = (*block).wait_next(); 1483 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1484 if !(*next).next.load(Ordering::Relaxed).is_null() { 1485 next_index |= HAS_NEXT; 1486 } 1487 1488 self.head.block.store(next, Ordering::Release); 1489 self.head.index.store(next_index, Ordering::Release); 1490 } 1491 1492 // Read the task. 1493 let slot = (*block).slots.get_unchecked(offset); 1494 slot.wait_write(); 1495 let task = slot.task.get().read().assume_init(); 1496 1497 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1498 // but couldn't because we were busy reading from the slot. 1499 if (offset + 1 == BLOCK_CAP) 1500 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0) 1501 { 1502 Block::destroy(block, offset); 1503 } 1504 1505 Steal::Success(task) 1506 } 1507 } 1508 1509 /// Steals a batch of tasks and pushes them into a worker. 1510 /// 1511 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1512 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1513 /// 1514 /// # Examples 1515 /// 1516 /// ``` 1517 /// use crossbeam_deque::{Injector, Worker}; 1518 /// 1519 /// let q = Injector::new(); 1520 /// q.push(1); 1521 /// q.push(2); 1522 /// q.push(3); 1523 /// q.push(4); 1524 /// 1525 /// let w = Worker::new_fifo(); 1526 /// let _ = q.steal_batch(&w); 1527 /// assert_eq!(w.pop(), Some(1)); 1528 /// assert_eq!(w.pop(), Some(2)); 1529 /// ``` steal_batch(&self, dest: &Worker<T>) -> Steal<()>1530 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> { 1531 self.steal_batch_with_limit(dest, MAX_BATCH) 1532 } 1533 1534 /// Steals no more than of tasks and pushes them into a worker. 1535 /// 1536 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1537 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1538 /// 1539 /// # Examples 1540 /// 1541 /// ``` 1542 /// use crossbeam_deque::{Injector, Worker}; 1543 /// 1544 /// let q = Injector::new(); 1545 /// q.push(1); 1546 /// q.push(2); 1547 /// q.push(3); 1548 /// q.push(4); 1549 /// q.push(5); 1550 /// q.push(6); 1551 /// 1552 /// let w = Worker::new_fifo(); 1553 /// let _ = q.steal_batch_with_limit(&w, 2); 1554 /// assert_eq!(w.pop(), Some(1)); 1555 /// assert_eq!(w.pop(), Some(2)); 1556 /// assert_eq!(w.pop(), None); 1557 /// 1558 /// q.push(7); 1559 /// q.push(8); 1560 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 1561 /// // half of the elements are currently popped, but the number of popped elements is considered 1562 /// // an implementation detail that may be changed in the future. 1563 /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX); 1564 /// assert_eq!(w.len(), 3); 1565 /// ``` steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>1566 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> { 1567 assert!(limit > 0); 1568 let mut head; 1569 let mut block; 1570 let mut offset; 1571 1572 let backoff = Backoff::new(); 1573 loop { 1574 head = self.head.index.load(Ordering::Acquire); 1575 block = self.head.block.load(Ordering::Acquire); 1576 1577 // Calculate the offset of the index into the block. 1578 offset = (head >> SHIFT) % LAP; 1579 1580 // If we reached the end of the block, wait until the next one is installed. 1581 if offset == BLOCK_CAP { 1582 backoff.snooze(); 1583 } else { 1584 break; 1585 } 1586 } 1587 1588 let mut new_head = head; 1589 let advance; 1590 1591 if new_head & HAS_NEXT == 0 { 1592 atomic::fence(Ordering::SeqCst); 1593 let tail = self.tail.index.load(Ordering::Relaxed); 1594 1595 // If the tail equals the head, that means the queue is empty. 1596 if head >> SHIFT == tail >> SHIFT { 1597 return Steal::Empty; 1598 } 1599 1600 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate 1601 // the right batch size to steal. 1602 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1603 new_head |= HAS_NEXT; 1604 // We can steal all tasks till the end of the block. 1605 advance = (BLOCK_CAP - offset).min(limit); 1606 } else { 1607 let len = (tail - head) >> SHIFT; 1608 // Steal half of the available tasks. 1609 advance = ((len + 1) / 2).min(limit); 1610 } 1611 } else { 1612 // We can steal all tasks till the end of the block. 1613 advance = (BLOCK_CAP - offset).min(limit); 1614 } 1615 1616 new_head += advance << SHIFT; 1617 let new_offset = offset + advance; 1618 1619 // Try moving the head index forward. 1620 if self 1621 .head 1622 .index 1623 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1624 .is_err() 1625 { 1626 return Steal::Retry; 1627 } 1628 1629 // Reserve capacity for the stolen batch. 1630 let batch_size = new_offset - offset; 1631 dest.reserve(batch_size); 1632 1633 // Get the destination buffer and back index. 1634 let dest_buffer = dest.buffer.get(); 1635 let dest_b = dest.inner.back.load(Ordering::Relaxed); 1636 1637 unsafe { 1638 // If we've reached the end of the block, move to the next one. 1639 if new_offset == BLOCK_CAP { 1640 let next = (*block).wait_next(); 1641 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1642 if !(*next).next.load(Ordering::Relaxed).is_null() { 1643 next_index |= HAS_NEXT; 1644 } 1645 1646 self.head.block.store(next, Ordering::Release); 1647 self.head.index.store(next_index, Ordering::Release); 1648 } 1649 1650 // Copy values from the injector into the destination queue. 1651 match dest.flavor { 1652 Flavor::Fifo => { 1653 for i in 0..batch_size { 1654 // Read the task. 1655 let slot = (*block).slots.get_unchecked(offset + i); 1656 slot.wait_write(); 1657 let task = slot.task.get().read(); 1658 1659 // Write it into the destination queue. 1660 dest_buffer.write(dest_b.wrapping_add(i as isize), task); 1661 } 1662 } 1663 1664 Flavor::Lifo => { 1665 for i in 0..batch_size { 1666 // Read the task. 1667 let slot = (*block).slots.get_unchecked(offset + i); 1668 slot.wait_write(); 1669 let task = slot.task.get().read(); 1670 1671 // Write it into the destination queue. 1672 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); 1673 } 1674 } 1675 } 1676 1677 atomic::fence(Ordering::Release); 1678 1679 // Update the back index in the destination queue. 1680 // 1681 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report 1682 // data races because it doesn't understand fences. 1683 dest.inner 1684 .back 1685 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); 1686 1687 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1688 // but couldn't because we were busy reading from the slot. 1689 if new_offset == BLOCK_CAP { 1690 Block::destroy(block, offset); 1691 } else { 1692 for i in offset..new_offset { 1693 let slot = (*block).slots.get_unchecked(i); 1694 1695 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 1696 Block::destroy(block, offset); 1697 break; 1698 } 1699 } 1700 } 1701 1702 Steal::Success(()) 1703 } 1704 } 1705 1706 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker. 1707 /// 1708 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1709 /// steal around half of the tasks in the queue, but also not more than some constant limit. 1710 /// 1711 /// # Examples 1712 /// 1713 /// ``` 1714 /// use crossbeam_deque::{Injector, Steal, Worker}; 1715 /// 1716 /// let q = Injector::new(); 1717 /// q.push(1); 1718 /// q.push(2); 1719 /// q.push(3); 1720 /// q.push(4); 1721 /// 1722 /// let w = Worker::new_fifo(); 1723 /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1)); 1724 /// assert_eq!(w.pop(), Some(2)); 1725 /// ``` steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>1726 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> { 1727 // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly 1728 // better, but we may change it in the future to be compatible with the same method in Stealer. 1729 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1) 1730 } 1731 1732 /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker. 1733 /// 1734 /// How many tasks exactly will be stolen is not specified. That said, this method will try to 1735 /// steal around half of the tasks in the queue, but also not more than the given limit. 1736 /// 1737 /// # Examples 1738 /// 1739 /// ``` 1740 /// use crossbeam_deque::{Injector, Steal, Worker}; 1741 /// 1742 /// let q = Injector::new(); 1743 /// q.push(1); 1744 /// q.push(2); 1745 /// q.push(3); 1746 /// q.push(4); 1747 /// q.push(5); 1748 /// q.push(6); 1749 /// 1750 /// let w = Worker::new_fifo(); 1751 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1)); 1752 /// assert_eq!(w.pop(), Some(2)); 1753 /// assert_eq!(w.pop(), None); 1754 /// 1755 /// q.push(7); 1756 /// // Setting a large limit does not guarantee that all elements will be popped. In this case, 1757 /// // half of the elements are currently popped, but the number of popped elements is considered 1758 /// // an implementation detail that may be changed in the future. 1759 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3)); 1760 /// assert_eq!(w.pop(), Some(4)); 1761 /// assert_eq!(w.pop(), Some(5)); 1762 /// assert_eq!(w.pop(), None); 1763 /// ``` steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>1764 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> { 1765 assert!(limit > 0); 1766 let mut head; 1767 let mut block; 1768 let mut offset; 1769 1770 let backoff = Backoff::new(); 1771 loop { 1772 head = self.head.index.load(Ordering::Acquire); 1773 block = self.head.block.load(Ordering::Acquire); 1774 1775 // Calculate the offset of the index into the block. 1776 offset = (head >> SHIFT) % LAP; 1777 1778 // If we reached the end of the block, wait until the next one is installed. 1779 if offset == BLOCK_CAP { 1780 backoff.snooze(); 1781 } else { 1782 break; 1783 } 1784 } 1785 1786 let mut new_head = head; 1787 let advance; 1788 1789 if new_head & HAS_NEXT == 0 { 1790 atomic::fence(Ordering::SeqCst); 1791 let tail = self.tail.index.load(Ordering::Relaxed); 1792 1793 // If the tail equals the head, that means the queue is empty. 1794 if head >> SHIFT == tail >> SHIFT { 1795 return Steal::Empty; 1796 } 1797 1798 // If head and tail are not in the same block, set `HAS_NEXT` in head. 1799 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 1800 new_head |= HAS_NEXT; 1801 // We can steal all tasks till the end of the block. 1802 advance = (BLOCK_CAP - offset).min(limit); 1803 } else { 1804 let len = (tail - head) >> SHIFT; 1805 // Steal half of the available tasks. 1806 advance = ((len + 1) / 2).min(limit); 1807 } 1808 } else { 1809 // We can steal all tasks till the end of the block. 1810 advance = (BLOCK_CAP - offset).min(limit); 1811 } 1812 1813 new_head += advance << SHIFT; 1814 let new_offset = offset + advance; 1815 1816 // Try moving the head index forward. 1817 if self 1818 .head 1819 .index 1820 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire) 1821 .is_err() 1822 { 1823 return Steal::Retry; 1824 } 1825 1826 // Reserve capacity for the stolen batch. 1827 let batch_size = new_offset - offset - 1; 1828 dest.reserve(batch_size); 1829 1830 // Get the destination buffer and back index. 1831 let dest_buffer = dest.buffer.get(); 1832 let dest_b = dest.inner.back.load(Ordering::Relaxed); 1833 1834 unsafe { 1835 // If we've reached the end of the block, move to the next one. 1836 if new_offset == BLOCK_CAP { 1837 let next = (*block).wait_next(); 1838 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); 1839 if !(*next).next.load(Ordering::Relaxed).is_null() { 1840 next_index |= HAS_NEXT; 1841 } 1842 1843 self.head.block.store(next, Ordering::Release); 1844 self.head.index.store(next_index, Ordering::Release); 1845 } 1846 1847 // Read the task. 1848 let slot = (*block).slots.get_unchecked(offset); 1849 slot.wait_write(); 1850 let task = slot.task.get().read(); 1851 1852 match dest.flavor { 1853 Flavor::Fifo => { 1854 // Copy values from the injector into the destination queue. 1855 for i in 0..batch_size { 1856 // Read the task. 1857 let slot = (*block).slots.get_unchecked(offset + i + 1); 1858 slot.wait_write(); 1859 let task = slot.task.get().read(); 1860 1861 // Write it into the destination queue. 1862 dest_buffer.write(dest_b.wrapping_add(i as isize), task); 1863 } 1864 } 1865 1866 Flavor::Lifo => { 1867 // Copy values from the injector into the destination queue. 1868 for i in 0..batch_size { 1869 // Read the task. 1870 let slot = (*block).slots.get_unchecked(offset + i + 1); 1871 slot.wait_write(); 1872 let task = slot.task.get().read(); 1873 1874 // Write it into the destination queue. 1875 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); 1876 } 1877 } 1878 } 1879 1880 atomic::fence(Ordering::Release); 1881 1882 // Update the back index in the destination queue. 1883 // 1884 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report 1885 // data races because it doesn't understand fences. 1886 dest.inner 1887 .back 1888 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); 1889 1890 // Destroy the block if we've reached the end, or if another thread wanted to destroy 1891 // but couldn't because we were busy reading from the slot. 1892 if new_offset == BLOCK_CAP { 1893 Block::destroy(block, offset); 1894 } else { 1895 for i in offset..new_offset { 1896 let slot = (*block).slots.get_unchecked(i); 1897 1898 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 1899 Block::destroy(block, offset); 1900 break; 1901 } 1902 } 1903 } 1904 1905 Steal::Success(task.assume_init()) 1906 } 1907 } 1908 1909 /// Returns `true` if the queue is empty. 1910 /// 1911 /// # Examples 1912 /// 1913 /// ``` 1914 /// use crossbeam_deque::Injector; 1915 /// 1916 /// let q = Injector::new(); 1917 /// 1918 /// assert!(q.is_empty()); 1919 /// q.push(1); 1920 /// assert!(!q.is_empty()); 1921 /// ``` is_empty(&self) -> bool1922 pub fn is_empty(&self) -> bool { 1923 let head = self.head.index.load(Ordering::SeqCst); 1924 let tail = self.tail.index.load(Ordering::SeqCst); 1925 head >> SHIFT == tail >> SHIFT 1926 } 1927 1928 /// Returns the number of tasks in the queue. 1929 /// 1930 /// # Examples 1931 /// 1932 /// ``` 1933 /// use crossbeam_deque::Injector; 1934 /// 1935 /// let q = Injector::new(); 1936 /// 1937 /// assert_eq!(q.len(), 0); 1938 /// q.push(1); 1939 /// assert_eq!(q.len(), 1); 1940 /// q.push(1); 1941 /// assert_eq!(q.len(), 2); 1942 /// ``` len(&self) -> usize1943 pub fn len(&self) -> usize { 1944 loop { 1945 // Load the tail index, then load the head index. 1946 let mut tail = self.tail.index.load(Ordering::SeqCst); 1947 let mut head = self.head.index.load(Ordering::SeqCst); 1948 1949 // If the tail index didn't change, we've got consistent indices to work with. 1950 if self.tail.index.load(Ordering::SeqCst) == tail { 1951 // Erase the lower bits. 1952 tail &= !((1 << SHIFT) - 1); 1953 head &= !((1 << SHIFT) - 1); 1954 1955 // Fix up indices if they fall onto block ends. 1956 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { 1957 tail = tail.wrapping_add(1 << SHIFT); 1958 } 1959 if (head >> SHIFT) & (LAP - 1) == LAP - 1 { 1960 head = head.wrapping_add(1 << SHIFT); 1961 } 1962 1963 // Rotate indices so that head falls into the first block. 1964 let lap = (head >> SHIFT) / LAP; 1965 tail = tail.wrapping_sub((lap * LAP) << SHIFT); 1966 head = head.wrapping_sub((lap * LAP) << SHIFT); 1967 1968 // Remove the lower bits. 1969 tail >>= SHIFT; 1970 head >>= SHIFT; 1971 1972 // Return the difference minus the number of blocks between tail and head. 1973 return tail - head - tail / LAP; 1974 } 1975 } 1976 } 1977 } 1978 1979 impl<T> Drop for Injector<T> { drop(&mut self)1980 fn drop(&mut self) { 1981 let mut head = *self.head.index.get_mut(); 1982 let mut tail = *self.tail.index.get_mut(); 1983 let mut block = *self.head.block.get_mut(); 1984 1985 // Erase the lower bits. 1986 head &= !((1 << SHIFT) - 1); 1987 tail &= !((1 << SHIFT) - 1); 1988 1989 unsafe { 1990 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. 1991 while head != tail { 1992 let offset = (head >> SHIFT) % LAP; 1993 1994 if offset < BLOCK_CAP { 1995 // Drop the task in the slot. 1996 let slot = (*block).slots.get_unchecked(offset); 1997 (*slot.task.get()).assume_init_drop(); 1998 } else { 1999 // Deallocate the block and move to the next one. 2000 let next = *(*block).next.get_mut(); 2001 drop(Box::from_raw(block)); 2002 block = next; 2003 } 2004 2005 head = head.wrapping_add(1 << SHIFT); 2006 } 2007 2008 // Deallocate the last remaining block. 2009 drop(Box::from_raw(block)); 2010 } 2011 } 2012 } 2013 2014 impl<T> fmt::Debug for Injector<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2015 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 2016 f.pad("Worker { .. }") 2017 } 2018 } 2019 2020 /// Possible outcomes of a steal operation. 2021 /// 2022 /// # Examples 2023 /// 2024 /// There are lots of ways to chain results of steal operations together: 2025 /// 2026 /// ``` 2027 /// use crossbeam_deque::Steal::{self, Empty, Retry, Success}; 2028 /// 2029 /// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>(); 2030 /// 2031 /// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty); 2032 /// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry); 2033 /// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1)); 2034 /// 2035 /// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry); 2036 /// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1)); 2037 /// ``` 2038 #[must_use] 2039 #[derive(PartialEq, Eq, Copy, Clone)] 2040 pub enum Steal<T> { 2041 /// The queue was empty at the time of stealing. 2042 Empty, 2043 2044 /// At least one task was successfully stolen. 2045 Success(T), 2046 2047 /// The steal operation needs to be retried. 2048 Retry, 2049 } 2050 2051 impl<T> Steal<T> { 2052 /// Returns `true` if the queue was empty at the time of stealing. 2053 /// 2054 /// # Examples 2055 /// 2056 /// ``` 2057 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2058 /// 2059 /// assert!(!Success(7).is_empty()); 2060 /// assert!(!Retry::<i32>.is_empty()); 2061 /// 2062 /// assert!(Empty::<i32>.is_empty()); 2063 /// ``` is_empty(&self) -> bool2064 pub fn is_empty(&self) -> bool { 2065 match self { 2066 Steal::Empty => true, 2067 _ => false, 2068 } 2069 } 2070 2071 /// Returns `true` if at least one task was stolen. 2072 /// 2073 /// # Examples 2074 /// 2075 /// ``` 2076 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2077 /// 2078 /// assert!(!Empty::<i32>.is_success()); 2079 /// assert!(!Retry::<i32>.is_success()); 2080 /// 2081 /// assert!(Success(7).is_success()); 2082 /// ``` is_success(&self) -> bool2083 pub fn is_success(&self) -> bool { 2084 match self { 2085 Steal::Success(_) => true, 2086 _ => false, 2087 } 2088 } 2089 2090 /// Returns `true` if the steal operation needs to be retried. 2091 /// 2092 /// # Examples 2093 /// 2094 /// ``` 2095 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2096 /// 2097 /// assert!(!Empty::<i32>.is_retry()); 2098 /// assert!(!Success(7).is_retry()); 2099 /// 2100 /// assert!(Retry::<i32>.is_retry()); 2101 /// ``` is_retry(&self) -> bool2102 pub fn is_retry(&self) -> bool { 2103 match self { 2104 Steal::Retry => true, 2105 _ => false, 2106 } 2107 } 2108 2109 /// Returns the result of the operation, if successful. 2110 /// 2111 /// # Examples 2112 /// 2113 /// ``` 2114 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2115 /// 2116 /// assert_eq!(Empty::<i32>.success(), None); 2117 /// assert_eq!(Retry::<i32>.success(), None); 2118 /// 2119 /// assert_eq!(Success(7).success(), Some(7)); 2120 /// ``` success(self) -> Option<T>2121 pub fn success(self) -> Option<T> { 2122 match self { 2123 Steal::Success(res) => Some(res), 2124 _ => None, 2125 } 2126 } 2127 2128 /// If no task was stolen, attempts another steal operation. 2129 /// 2130 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then: 2131 /// 2132 /// * If the second steal resulted in `Success`, it is returned. 2133 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned. 2134 /// * If both resulted in `None`, then `None` is returned. 2135 /// 2136 /// # Examples 2137 /// 2138 /// ``` 2139 /// use crossbeam_deque::Steal::{Empty, Retry, Success}; 2140 /// 2141 /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1)); 2142 /// assert_eq!(Retry.or_else(|| Success(2)), Success(2)); 2143 /// 2144 /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>); 2145 /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>); 2146 /// 2147 /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>); 2148 /// ``` or_else<F>(self, f: F) -> Steal<T> where F: FnOnce() -> Steal<T>,2149 pub fn or_else<F>(self, f: F) -> Steal<T> 2150 where 2151 F: FnOnce() -> Steal<T>, 2152 { 2153 match self { 2154 Steal::Empty => f(), 2155 Steal::Success(_) => self, 2156 Steal::Retry => { 2157 if let Steal::Success(res) = f() { 2158 Steal::Success(res) 2159 } else { 2160 Steal::Retry 2161 } 2162 } 2163 } 2164 } 2165 } 2166 2167 impl<T> fmt::Debug for Steal<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 2169 match self { 2170 Steal::Empty => f.pad("Empty"), 2171 Steal::Success(_) => f.pad("Success(..)"), 2172 Steal::Retry => f.pad("Retry"), 2173 } 2174 } 2175 } 2176 2177 impl<T> FromIterator<Steal<T>> for Steal<T> { 2178 /// Consumes items until a `Success` is found and returns it. 2179 /// 2180 /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`. 2181 /// Otherwise, `Empty` is returned. from_iter<I>(iter: I) -> Steal<T> where I: IntoIterator<Item = Steal<T>>,2182 fn from_iter<I>(iter: I) -> Steal<T> 2183 where 2184 I: IntoIterator<Item = Steal<T>>, 2185 { 2186 let mut retry = false; 2187 for s in iter { 2188 match &s { 2189 Steal::Empty => {} 2190 Steal::Success(_) => return s, 2191 Steal::Retry => retry = true, 2192 } 2193 } 2194 2195 if retry { 2196 Steal::Retry 2197 } else { 2198 Steal::Empty 2199 } 2200 } 2201 } 2202