1 use std::cell::{Cell, UnsafeCell};
2 use std::cmp;
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem::{self, MaybeUninit};
6 use std::ptr;
7 use std::slice;
8 use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
9 use std::sync::Arc;
10 
11 use crossbeam_epoch::{self as epoch, Atomic, Owned};
12 use crossbeam_utils::{Backoff, CachePadded};
13 
14 // Minimum buffer capacity.
15 const MIN_CAP: usize = 64;
16 // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
17 const MAX_BATCH: usize = 32;
18 // If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
19 // deallocated as soon as possible.
20 const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
21 
22 /// A buffer that holds tasks in a worker queue.
23 ///
24 /// This is just a pointer to the buffer and its length - dropping an instance of this struct will
25 /// *not* deallocate the buffer.
26 struct Buffer<T> {
27     /// Pointer to the allocated memory.
28     ptr: *mut T,
29 
30     /// Capacity of the buffer. Always a power of two.
31     cap: usize,
32 }
33 
34 unsafe impl<T> Send for Buffer<T> {}
35 
36 impl<T> Buffer<T> {
37     /// Allocates a new buffer with the specified capacity.
alloc(cap: usize) -> Buffer<T>38     fn alloc(cap: usize) -> Buffer<T> {
39         debug_assert_eq!(cap, cap.next_power_of_two());
40 
41         let ptr = Box::into_raw(
42             (0..cap)
43                 .map(|_| MaybeUninit::<T>::uninit())
44                 .collect::<Box<[_]>>(),
45         )
46         .cast::<T>();
47 
48         Buffer { ptr, cap }
49     }
50 
51     /// Deallocates the buffer.
dealloc(self)52     unsafe fn dealloc(self) {
53         drop(Box::from_raw(slice::from_raw_parts_mut(
54             self.ptr.cast::<MaybeUninit<T>>(),
55             self.cap,
56         )));
57     }
58 
59     /// Returns a pointer to the task at the specified `index`.
at(&self, index: isize) -> *mut T60     unsafe fn at(&self, index: isize) -> *mut T {
61         // `self.cap` is always a power of two.
62         // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
63         // don't actually have the right to access this memory.
64         self.ptr.offset(index & (self.cap - 1) as isize)
65     }
66 
67     /// Writes `task` into the specified `index`.
68     ///
69     /// This method might be concurrently called with another `read` at the same index, which is
70     /// technically speaking a data race and therefore UB. We should use an atomic store here, but
71     /// that would be more expensive and difficult to implement generically for all types `T`.
72     /// Hence, as a hack, we use a volatile write instead.
write(&self, index: isize, task: MaybeUninit<T>)73     unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
74         ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
75     }
76 
77     /// Reads a task from the specified `index`.
78     ///
79     /// This method might be concurrently called with another `write` at the same index, which is
80     /// technically speaking a data race and therefore UB. We should use an atomic load here, but
81     /// that would be more expensive and difficult to implement generically for all types `T`.
82     /// Hence, as a hack, we use a volatile load instead.
read(&self, index: isize) -> MaybeUninit<T>83     unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
84         ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
85     }
86 }
87 
88 impl<T> Clone for Buffer<T> {
clone(&self) -> Buffer<T>89     fn clone(&self) -> Buffer<T> {
90         *self
91     }
92 }
93 
94 impl<T> Copy for Buffer<T> {}
95 
96 /// Internal queue data shared between the worker and stealers.
97 ///
98 /// The implementation is based on the following work:
99 ///
100 /// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
101 /// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
102 ///    PPoPP 2013.][weak-mem]
103 /// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
104 ///    atomics. OOPSLA 2013.][checker]
105 ///
106 /// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
107 /// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
108 /// [checker]: https://dl.acm.org/citation.cfm?id=2509514
109 struct Inner<T> {
110     /// The front index.
111     front: AtomicIsize,
112 
113     /// The back index.
114     back: AtomicIsize,
115 
116     /// The underlying buffer.
117     buffer: CachePadded<Atomic<Buffer<T>>>,
118 }
119 
120 impl<T> Drop for Inner<T> {
drop(&mut self)121     fn drop(&mut self) {
122         // Load the back index, front index, and buffer.
123         let b = *self.back.get_mut();
124         let f = *self.front.get_mut();
125 
126         unsafe {
127             let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
128 
129             // Go through the buffer from front to back and drop all tasks in the queue.
130             let mut i = f;
131             while i != b {
132                 buffer.deref().at(i).drop_in_place();
133                 i = i.wrapping_add(1);
134             }
135 
136             // Free the memory allocated by the buffer.
137             buffer.into_owned().into_box().dealloc();
138         }
139     }
140 }
141 
142 /// Worker queue flavor: FIFO or LIFO.
143 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
144 enum Flavor {
145     /// The first-in first-out flavor.
146     Fifo,
147 
148     /// The last-in first-out flavor.
149     Lifo,
150 }
151 
152 /// A worker queue.
153 ///
154 /// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
155 /// tasks from it. Task schedulers typically create a single worker queue per thread.
156 ///
157 /// # Examples
158 ///
159 /// A FIFO worker:
160 ///
161 /// ```
162 /// use crossbeam_deque::{Steal, Worker};
163 ///
164 /// let w = Worker::new_fifo();
165 /// let s = w.stealer();
166 ///
167 /// w.push(1);
168 /// w.push(2);
169 /// w.push(3);
170 ///
171 /// assert_eq!(s.steal(), Steal::Success(1));
172 /// assert_eq!(w.pop(), Some(2));
173 /// assert_eq!(w.pop(), Some(3));
174 /// ```
175 ///
176 /// A LIFO worker:
177 ///
178 /// ```
179 /// use crossbeam_deque::{Steal, Worker};
180 ///
181 /// let w = Worker::new_lifo();
182 /// let s = w.stealer();
183 ///
184 /// w.push(1);
185 /// w.push(2);
186 /// w.push(3);
187 ///
188 /// assert_eq!(s.steal(), Steal::Success(1));
189 /// assert_eq!(w.pop(), Some(3));
190 /// assert_eq!(w.pop(), Some(2));
191 /// ```
192 pub struct Worker<T> {
193     /// A reference to the inner representation of the queue.
194     inner: Arc<CachePadded<Inner<T>>>,
195 
196     /// A copy of `inner.buffer` for quick access.
197     buffer: Cell<Buffer<T>>,
198 
199     /// The flavor of the queue.
200     flavor: Flavor,
201 
202     /// Indicates that the worker cannot be shared among threads.
203     _marker: PhantomData<*mut ()>, // !Send + !Sync
204 }
205 
206 unsafe impl<T: Send> Send for Worker<T> {}
207 
208 impl<T> Worker<T> {
209     /// Creates a FIFO worker queue.
210     ///
211     /// Tasks are pushed and popped from opposite ends.
212     ///
213     /// # Examples
214     ///
215     /// ```
216     /// use crossbeam_deque::Worker;
217     ///
218     /// let w = Worker::<i32>::new_fifo();
219     /// ```
new_fifo() -> Worker<T>220     pub fn new_fifo() -> Worker<T> {
221         let buffer = Buffer::alloc(MIN_CAP);
222 
223         let inner = Arc::new(CachePadded::new(Inner {
224             front: AtomicIsize::new(0),
225             back: AtomicIsize::new(0),
226             buffer: CachePadded::new(Atomic::new(buffer)),
227         }));
228 
229         Worker {
230             inner,
231             buffer: Cell::new(buffer),
232             flavor: Flavor::Fifo,
233             _marker: PhantomData,
234         }
235     }
236 
237     /// Creates a LIFO worker queue.
238     ///
239     /// Tasks are pushed and popped from the same end.
240     ///
241     /// # Examples
242     ///
243     /// ```
244     /// use crossbeam_deque::Worker;
245     ///
246     /// let w = Worker::<i32>::new_lifo();
247     /// ```
new_lifo() -> Worker<T>248     pub fn new_lifo() -> Worker<T> {
249         let buffer = Buffer::alloc(MIN_CAP);
250 
251         let inner = Arc::new(CachePadded::new(Inner {
252             front: AtomicIsize::new(0),
253             back: AtomicIsize::new(0),
254             buffer: CachePadded::new(Atomic::new(buffer)),
255         }));
256 
257         Worker {
258             inner,
259             buffer: Cell::new(buffer),
260             flavor: Flavor::Lifo,
261             _marker: PhantomData,
262         }
263     }
264 
265     /// Creates a stealer for this queue.
266     ///
267     /// The returned stealer can be shared among threads and cloned.
268     ///
269     /// # Examples
270     ///
271     /// ```
272     /// use crossbeam_deque::Worker;
273     ///
274     /// let w = Worker::<i32>::new_lifo();
275     /// let s = w.stealer();
276     /// ```
stealer(&self) -> Stealer<T>277     pub fn stealer(&self) -> Stealer<T> {
278         Stealer {
279             inner: self.inner.clone(),
280             flavor: self.flavor,
281         }
282     }
283 
284     /// Resizes the internal buffer to the new capacity of `new_cap`.
285     #[cold]
resize(&self, new_cap: usize)286     unsafe fn resize(&self, new_cap: usize) {
287         // Load the back index, front index, and buffer.
288         let b = self.inner.back.load(Ordering::Relaxed);
289         let f = self.inner.front.load(Ordering::Relaxed);
290         let buffer = self.buffer.get();
291 
292         // Allocate a new buffer and copy data from the old buffer to the new one.
293         let new = Buffer::alloc(new_cap);
294         let mut i = f;
295         while i != b {
296             ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
297             i = i.wrapping_add(1);
298         }
299 
300         let guard = &epoch::pin();
301 
302         // Replace the old buffer with the new one.
303         self.buffer.replace(new);
304         let old =
305             self.inner
306                 .buffer
307                 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
308 
309         // Destroy the old buffer later.
310         guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
311 
312         // If the buffer is very large, then flush the thread-local garbage in order to deallocate
313         // it as soon as possible.
314         if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
315             guard.flush();
316         }
317     }
318 
319     /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
320     /// buffer.
reserve(&self, reserve_cap: usize)321     fn reserve(&self, reserve_cap: usize) {
322         if reserve_cap > 0 {
323             // Compute the current length.
324             let b = self.inner.back.load(Ordering::Relaxed);
325             let f = self.inner.front.load(Ordering::SeqCst);
326             let len = b.wrapping_sub(f) as usize;
327 
328             // The current capacity.
329             let cap = self.buffer.get().cap;
330 
331             // Is there enough capacity to push `reserve_cap` tasks?
332             if cap - len < reserve_cap {
333                 // Keep doubling the capacity as much as is needed.
334                 let mut new_cap = cap * 2;
335                 while new_cap - len < reserve_cap {
336                     new_cap *= 2;
337                 }
338 
339                 // Resize the buffer.
340                 unsafe {
341                     self.resize(new_cap);
342                 }
343             }
344         }
345     }
346 
347     /// Returns `true` if the queue is empty.
348     ///
349     /// ```
350     /// use crossbeam_deque::Worker;
351     ///
352     /// let w = Worker::new_lifo();
353     ///
354     /// assert!(w.is_empty());
355     /// w.push(1);
356     /// assert!(!w.is_empty());
357     /// ```
is_empty(&self) -> bool358     pub fn is_empty(&self) -> bool {
359         let b = self.inner.back.load(Ordering::Relaxed);
360         let f = self.inner.front.load(Ordering::SeqCst);
361         b.wrapping_sub(f) <= 0
362     }
363 
364     /// Returns the number of tasks in the deque.
365     ///
366     /// ```
367     /// use crossbeam_deque::Worker;
368     ///
369     /// let w = Worker::new_lifo();
370     ///
371     /// assert_eq!(w.len(), 0);
372     /// w.push(1);
373     /// assert_eq!(w.len(), 1);
374     /// w.push(1);
375     /// assert_eq!(w.len(), 2);
376     /// ```
len(&self) -> usize377     pub fn len(&self) -> usize {
378         let b = self.inner.back.load(Ordering::Relaxed);
379         let f = self.inner.front.load(Ordering::SeqCst);
380         b.wrapping_sub(f).max(0) as usize
381     }
382 
383     /// Pushes a task into the queue.
384     ///
385     /// # Examples
386     ///
387     /// ```
388     /// use crossbeam_deque::Worker;
389     ///
390     /// let w = Worker::new_lifo();
391     /// w.push(1);
392     /// w.push(2);
393     /// ```
push(&self, task: T)394     pub fn push(&self, task: T) {
395         // Load the back index, front index, and buffer.
396         let b = self.inner.back.load(Ordering::Relaxed);
397         let f = self.inner.front.load(Ordering::Acquire);
398         let mut buffer = self.buffer.get();
399 
400         // Calculate the length of the queue.
401         let len = b.wrapping_sub(f);
402 
403         // Is the queue full?
404         if len >= buffer.cap as isize {
405             // Yes. Grow the underlying buffer.
406             unsafe {
407                 self.resize(2 * buffer.cap);
408             }
409             buffer = self.buffer.get();
410         }
411 
412         // Write `task` into the slot.
413         unsafe {
414             buffer.write(b, MaybeUninit::new(task));
415         }
416 
417         atomic::fence(Ordering::Release);
418 
419         // Increment the back index.
420         //
421         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
422         // races because it doesn't understand fences.
423         self.inner.back.store(b.wrapping_add(1), Ordering::Release);
424     }
425 
426     /// Pops a task from the queue.
427     ///
428     /// # Examples
429     ///
430     /// ```
431     /// use crossbeam_deque::Worker;
432     ///
433     /// let w = Worker::new_fifo();
434     /// w.push(1);
435     /// w.push(2);
436     ///
437     /// assert_eq!(w.pop(), Some(1));
438     /// assert_eq!(w.pop(), Some(2));
439     /// assert_eq!(w.pop(), None);
440     /// ```
pop(&self) -> Option<T>441     pub fn pop(&self) -> Option<T> {
442         // Load the back and front index.
443         let b = self.inner.back.load(Ordering::Relaxed);
444         let f = self.inner.front.load(Ordering::Relaxed);
445 
446         // Calculate the length of the queue.
447         let len = b.wrapping_sub(f);
448 
449         // Is the queue empty?
450         if len <= 0 {
451             return None;
452         }
453 
454         match self.flavor {
455             // Pop from the front of the queue.
456             Flavor::Fifo => {
457                 // Try incrementing the front index to pop the task.
458                 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
459                 let new_f = f.wrapping_add(1);
460 
461                 if b.wrapping_sub(new_f) < 0 {
462                     self.inner.front.store(f, Ordering::Relaxed);
463                     return None;
464                 }
465 
466                 unsafe {
467                     // Read the popped task.
468                     let buffer = self.buffer.get();
469                     let task = buffer.read(f).assume_init();
470 
471                     // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
472                     if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
473                         self.resize(buffer.cap / 2);
474                     }
475 
476                     Some(task)
477                 }
478             }
479 
480             // Pop from the back of the queue.
481             Flavor::Lifo => {
482                 // Decrement the back index.
483                 let b = b.wrapping_sub(1);
484                 self.inner.back.store(b, Ordering::Relaxed);
485 
486                 atomic::fence(Ordering::SeqCst);
487 
488                 // Load the front index.
489                 let f = self.inner.front.load(Ordering::Relaxed);
490 
491                 // Compute the length after the back index was decremented.
492                 let len = b.wrapping_sub(f);
493 
494                 if len < 0 {
495                     // The queue is empty. Restore the back index to the original task.
496                     self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
497                     None
498                 } else {
499                     // Read the task to be popped.
500                     let buffer = self.buffer.get();
501                     let mut task = unsafe { Some(buffer.read(b)) };
502 
503                     // Are we popping the last task from the queue?
504                     if len == 0 {
505                         // Try incrementing the front index.
506                         if self
507                             .inner
508                             .front
509                             .compare_exchange(
510                                 f,
511                                 f.wrapping_add(1),
512                                 Ordering::SeqCst,
513                                 Ordering::Relaxed,
514                             )
515                             .is_err()
516                         {
517                             // Failed. We didn't pop anything. Reset to `None`.
518                             task.take();
519                         }
520 
521                         // Restore the back index to the original task.
522                         self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
523                     } else {
524                         // Shrink the buffer if `len` is less than one fourth of the capacity.
525                         if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
526                             unsafe {
527                                 self.resize(buffer.cap / 2);
528                             }
529                         }
530                     }
531 
532                     task.map(|t| unsafe { t.assume_init() })
533                 }
534             }
535         }
536     }
537 }
538 
539 impl<T> fmt::Debug for Worker<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result540     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
541         f.pad("Worker { .. }")
542     }
543 }
544 
545 /// A stealer handle of a worker queue.
546 ///
547 /// Stealers can be shared among threads.
548 ///
549 /// Task schedulers typically have a single worker queue per worker thread.
550 ///
551 /// # Examples
552 ///
553 /// ```
554 /// use crossbeam_deque::{Steal, Worker};
555 ///
556 /// let w = Worker::new_lifo();
557 /// w.push(1);
558 /// w.push(2);
559 ///
560 /// let s = w.stealer();
561 /// assert_eq!(s.steal(), Steal::Success(1));
562 /// assert_eq!(s.steal(), Steal::Success(2));
563 /// assert_eq!(s.steal(), Steal::Empty);
564 /// ```
565 pub struct Stealer<T> {
566     /// A reference to the inner representation of the queue.
567     inner: Arc<CachePadded<Inner<T>>>,
568 
569     /// The flavor of the queue.
570     flavor: Flavor,
571 }
572 
573 unsafe impl<T: Send> Send for Stealer<T> {}
574 unsafe impl<T: Send> Sync for Stealer<T> {}
575 
576 impl<T> Stealer<T> {
577     /// Returns `true` if the queue is empty.
578     ///
579     /// ```
580     /// use crossbeam_deque::Worker;
581     ///
582     /// let w = Worker::new_lifo();
583     /// let s = w.stealer();
584     ///
585     /// assert!(s.is_empty());
586     /// w.push(1);
587     /// assert!(!s.is_empty());
588     /// ```
is_empty(&self) -> bool589     pub fn is_empty(&self) -> bool {
590         let f = self.inner.front.load(Ordering::Acquire);
591         atomic::fence(Ordering::SeqCst);
592         let b = self.inner.back.load(Ordering::Acquire);
593         b.wrapping_sub(f) <= 0
594     }
595 
596     /// Returns the number of tasks in the deque.
597     ///
598     /// ```
599     /// use crossbeam_deque::Worker;
600     ///
601     /// let w = Worker::new_lifo();
602     /// let s = w.stealer();
603     ///
604     /// assert_eq!(s.len(), 0);
605     /// w.push(1);
606     /// assert_eq!(s.len(), 1);
607     /// w.push(2);
608     /// assert_eq!(s.len(), 2);
609     /// ```
len(&self) -> usize610     pub fn len(&self) -> usize {
611         let f = self.inner.front.load(Ordering::Acquire);
612         atomic::fence(Ordering::SeqCst);
613         let b = self.inner.back.load(Ordering::Acquire);
614         b.wrapping_sub(f).max(0) as usize
615     }
616 
617     /// Steals a task from the queue.
618     ///
619     /// # Examples
620     ///
621     /// ```
622     /// use crossbeam_deque::{Steal, Worker};
623     ///
624     /// let w = Worker::new_lifo();
625     /// w.push(1);
626     /// w.push(2);
627     ///
628     /// let s = w.stealer();
629     /// assert_eq!(s.steal(), Steal::Success(1));
630     /// assert_eq!(s.steal(), Steal::Success(2));
631     /// ```
steal(&self) -> Steal<T>632     pub fn steal(&self) -> Steal<T> {
633         // Load the front index.
634         let f = self.inner.front.load(Ordering::Acquire);
635 
636         // A SeqCst fence is needed here.
637         //
638         // If the current thread is already pinned (reentrantly), we must manually issue the
639         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
640         // have to.
641         if epoch::is_pinned() {
642             atomic::fence(Ordering::SeqCst);
643         }
644 
645         let guard = &epoch::pin();
646 
647         // Load the back index.
648         let b = self.inner.back.load(Ordering::Acquire);
649 
650         // Is the queue empty?
651         if b.wrapping_sub(f) <= 0 {
652             return Steal::Empty;
653         }
654 
655         // Load the buffer and read the task at the front.
656         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
657         let task = unsafe { buffer.deref().read(f) };
658 
659         // Try incrementing the front index to steal the task.
660         // If the buffer has been swapped or the increment fails, we retry.
661         if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
662             || self
663                 .inner
664                 .front
665                 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
666                 .is_err()
667         {
668             // We didn't steal this task, forget it.
669             return Steal::Retry;
670         }
671 
672         // Return the stolen task.
673         Steal::Success(unsafe { task.assume_init() })
674     }
675 
676     /// Steals a batch of tasks and pushes them into another worker.
677     ///
678     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
679     /// steal around half of the tasks in the queue, but also not more than some constant limit.
680     ///
681     /// # Examples
682     ///
683     /// ```
684     /// use crossbeam_deque::Worker;
685     ///
686     /// let w1 = Worker::new_fifo();
687     /// w1.push(1);
688     /// w1.push(2);
689     /// w1.push(3);
690     /// w1.push(4);
691     ///
692     /// let s = w1.stealer();
693     /// let w2 = Worker::new_fifo();
694     ///
695     /// let _ = s.steal_batch(&w2);
696     /// assert_eq!(w2.pop(), Some(1));
697     /// assert_eq!(w2.pop(), Some(2));
698     /// ```
steal_batch(&self, dest: &Worker<T>) -> Steal<()>699     pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
700         self.steal_batch_with_limit(dest, MAX_BATCH)
701     }
702 
703     /// Steals no more than `limit` of tasks and pushes them into another worker.
704     ///
705     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
706     /// steal around half of the tasks in the queue, but also not more than the given limit.
707     ///
708     /// # Examples
709     ///
710     /// ```
711     /// use crossbeam_deque::Worker;
712     ///
713     /// let w1 = Worker::new_fifo();
714     /// w1.push(1);
715     /// w1.push(2);
716     /// w1.push(3);
717     /// w1.push(4);
718     /// w1.push(5);
719     /// w1.push(6);
720     ///
721     /// let s = w1.stealer();
722     /// let w2 = Worker::new_fifo();
723     ///
724     /// let _ = s.steal_batch_with_limit(&w2, 2);
725     /// assert_eq!(w2.pop(), Some(1));
726     /// assert_eq!(w2.pop(), Some(2));
727     /// assert_eq!(w2.pop(), None);
728     ///
729     /// w1.push(7);
730     /// w1.push(8);
731     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
732     /// // half of the elements are currently popped, but the number of popped elements is considered
733     /// // an implementation detail that may be changed in the future.
734     /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
735     /// assert_eq!(w2.len(), 3);
736     /// ```
steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>737     pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
738         assert!(limit > 0);
739         if Arc::ptr_eq(&self.inner, &dest.inner) {
740             if dest.is_empty() {
741                 return Steal::Empty;
742             } else {
743                 return Steal::Success(());
744             }
745         }
746 
747         // Load the front index.
748         let mut f = self.inner.front.load(Ordering::Acquire);
749 
750         // A SeqCst fence is needed here.
751         //
752         // If the current thread is already pinned (reentrantly), we must manually issue the
753         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
754         // have to.
755         if epoch::is_pinned() {
756             atomic::fence(Ordering::SeqCst);
757         }
758 
759         let guard = &epoch::pin();
760 
761         // Load the back index.
762         let b = self.inner.back.load(Ordering::Acquire);
763 
764         // Is the queue empty?
765         let len = b.wrapping_sub(f);
766         if len <= 0 {
767             return Steal::Empty;
768         }
769 
770         // Reserve capacity for the stolen batch.
771         let batch_size = cmp::min((len as usize + 1) / 2, limit);
772         dest.reserve(batch_size);
773         let mut batch_size = batch_size as isize;
774 
775         // Get the destination buffer and back index.
776         let dest_buffer = dest.buffer.get();
777         let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
778 
779         // Load the buffer.
780         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
781 
782         match self.flavor {
783             // Steal a batch of tasks from the front at once.
784             Flavor::Fifo => {
785                 // Copy the batch from the source to the destination buffer.
786                 match dest.flavor {
787                     Flavor::Fifo => {
788                         for i in 0..batch_size {
789                             unsafe {
790                                 let task = buffer.deref().read(f.wrapping_add(i));
791                                 dest_buffer.write(dest_b.wrapping_add(i), task);
792                             }
793                         }
794                     }
795                     Flavor::Lifo => {
796                         for i in 0..batch_size {
797                             unsafe {
798                                 let task = buffer.deref().read(f.wrapping_add(i));
799                                 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
800                             }
801                         }
802                     }
803                 }
804 
805                 // Try incrementing the front index to steal the batch.
806                 // If the buffer has been swapped or the increment fails, we retry.
807                 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
808                     || self
809                         .inner
810                         .front
811                         .compare_exchange(
812                             f,
813                             f.wrapping_add(batch_size),
814                             Ordering::SeqCst,
815                             Ordering::Relaxed,
816                         )
817                         .is_err()
818                 {
819                     return Steal::Retry;
820                 }
821 
822                 dest_b = dest_b.wrapping_add(batch_size);
823             }
824 
825             // Steal a batch of tasks from the front one by one.
826             Flavor::Lifo => {
827                 // This loop may modify the batch_size, which triggers a clippy lint warning.
828                 // Use a new variable to avoid the warning, and to make it clear we aren't
829                 // modifying the loop exit condition during iteration.
830                 let original_batch_size = batch_size;
831 
832                 for i in 0..original_batch_size {
833                     // If this is not the first steal, check whether the queue is empty.
834                     if i > 0 {
835                         // We've already got the current front index. Now execute the fence to
836                         // synchronize with other threads.
837                         atomic::fence(Ordering::SeqCst);
838 
839                         // Load the back index.
840                         let b = self.inner.back.load(Ordering::Acquire);
841 
842                         // Is the queue empty?
843                         if b.wrapping_sub(f) <= 0 {
844                             batch_size = i;
845                             break;
846                         }
847                     }
848 
849                     // Read the task at the front.
850                     let task = unsafe { buffer.deref().read(f) };
851 
852                     // Try incrementing the front index to steal the task.
853                     // If the buffer has been swapped or the increment fails, we retry.
854                     if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
855                         || self
856                             .inner
857                             .front
858                             .compare_exchange(
859                                 f,
860                                 f.wrapping_add(1),
861                                 Ordering::SeqCst,
862                                 Ordering::Relaxed,
863                             )
864                             .is_err()
865                     {
866                         // We didn't steal this task, forget it and break from the loop.
867                         batch_size = i;
868                         break;
869                     }
870 
871                     // Write the stolen task into the destination buffer.
872                     unsafe {
873                         dest_buffer.write(dest_b, task);
874                     }
875 
876                     // Move the source front index and the destination back index one step forward.
877                     f = f.wrapping_add(1);
878                     dest_b = dest_b.wrapping_add(1);
879                 }
880 
881                 // If we didn't steal anything, the operation needs to be retried.
882                 if batch_size == 0 {
883                     return Steal::Retry;
884                 }
885 
886                 // If stealing into a FIFO queue, stolen tasks need to be reversed.
887                 if dest.flavor == Flavor::Fifo {
888                     for i in 0..batch_size / 2 {
889                         unsafe {
890                             let i1 = dest_b.wrapping_sub(batch_size - i);
891                             let i2 = dest_b.wrapping_sub(i + 1);
892                             let t1 = dest_buffer.read(i1);
893                             let t2 = dest_buffer.read(i2);
894                             dest_buffer.write(i1, t2);
895                             dest_buffer.write(i2, t1);
896                         }
897                     }
898                 }
899             }
900         }
901 
902         atomic::fence(Ordering::Release);
903 
904         // Update the back index in the destination queue.
905         //
906         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
907         // races because it doesn't understand fences.
908         dest.inner.back.store(dest_b, Ordering::Release);
909 
910         // Return with success.
911         Steal::Success(())
912     }
913 
914     /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
915     ///
916     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
917     /// steal around half of the tasks in the queue, but also not more than some constant limit.
918     ///
919     /// # Examples
920     ///
921     /// ```
922     /// use crossbeam_deque::{Steal, Worker};
923     ///
924     /// let w1 = Worker::new_fifo();
925     /// w1.push(1);
926     /// w1.push(2);
927     /// w1.push(3);
928     /// w1.push(4);
929     ///
930     /// let s = w1.stealer();
931     /// let w2 = Worker::new_fifo();
932     ///
933     /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
934     /// assert_eq!(w2.pop(), Some(2));
935     /// ```
steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>936     pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
937         self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
938     }
939 
940     /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
941     /// that worker.
942     ///
943     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
944     /// steal around half of the tasks in the queue, but also not more than the given limit.
945     ///
946     /// # Examples
947     ///
948     /// ```
949     /// use crossbeam_deque::{Steal, Worker};
950     ///
951     /// let w1 = Worker::new_fifo();
952     /// w1.push(1);
953     /// w1.push(2);
954     /// w1.push(3);
955     /// w1.push(4);
956     /// w1.push(5);
957     /// w1.push(6);
958     ///
959     /// let s = w1.stealer();
960     /// let w2 = Worker::new_fifo();
961     ///
962     /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
963     /// assert_eq!(w2.pop(), Some(2));
964     /// assert_eq!(w2.pop(), None);
965     ///
966     /// w1.push(7);
967     /// w1.push(8);
968     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
969     /// // half of the elements are currently popped, but the number of popped elements is considered
970     /// // an implementation detail that may be changed in the future.
971     /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
972     /// assert_eq!(w2.pop(), Some(4));
973     /// assert_eq!(w2.pop(), Some(5));
974     /// assert_eq!(w2.pop(), None);
975     /// ```
steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>976     pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
977         assert!(limit > 0);
978         if Arc::ptr_eq(&self.inner, &dest.inner) {
979             match dest.pop() {
980                 None => return Steal::Empty,
981                 Some(task) => return Steal::Success(task),
982             }
983         }
984 
985         // Load the front index.
986         let mut f = self.inner.front.load(Ordering::Acquire);
987 
988         // A SeqCst fence is needed here.
989         //
990         // If the current thread is already pinned (reentrantly), we must manually issue the
991         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
992         // have to.
993         if epoch::is_pinned() {
994             atomic::fence(Ordering::SeqCst);
995         }
996 
997         let guard = &epoch::pin();
998 
999         // Load the back index.
1000         let b = self.inner.back.load(Ordering::Acquire);
1001 
1002         // Is the queue empty?
1003         let len = b.wrapping_sub(f);
1004         if len <= 0 {
1005             return Steal::Empty;
1006         }
1007 
1008         // Reserve capacity for the stolen batch.
1009         let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1010         dest.reserve(batch_size);
1011         let mut batch_size = batch_size as isize;
1012 
1013         // Get the destination buffer and back index.
1014         let dest_buffer = dest.buffer.get();
1015         let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1016 
1017         // Load the buffer
1018         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1019 
1020         // Read the task at the front.
1021         let mut task = unsafe { buffer.deref().read(f) };
1022 
1023         match self.flavor {
1024             // Steal a batch of tasks from the front at once.
1025             Flavor::Fifo => {
1026                 // Copy the batch from the source to the destination buffer.
1027                 match dest.flavor {
1028                     Flavor::Fifo => {
1029                         for i in 0..batch_size {
1030                             unsafe {
1031                                 let task = buffer.deref().read(f.wrapping_add(i + 1));
1032                                 dest_buffer.write(dest_b.wrapping_add(i), task);
1033                             }
1034                         }
1035                     }
1036                     Flavor::Lifo => {
1037                         for i in 0..batch_size {
1038                             unsafe {
1039                                 let task = buffer.deref().read(f.wrapping_add(i + 1));
1040                                 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1041                             }
1042                         }
1043                     }
1044                 }
1045 
1046                 // Try incrementing the front index to steal the task.
1047                 // If the buffer has been swapped or the increment fails, we retry.
1048                 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1049                     || self
1050                         .inner
1051                         .front
1052                         .compare_exchange(
1053                             f,
1054                             f.wrapping_add(batch_size + 1),
1055                             Ordering::SeqCst,
1056                             Ordering::Relaxed,
1057                         )
1058                         .is_err()
1059                 {
1060                     // We didn't steal this task, forget it.
1061                     return Steal::Retry;
1062                 }
1063 
1064                 dest_b = dest_b.wrapping_add(batch_size);
1065             }
1066 
1067             // Steal a batch of tasks from the front one by one.
1068             Flavor::Lifo => {
1069                 // Try incrementing the front index to steal the task.
1070                 if self
1071                     .inner
1072                     .front
1073                     .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1074                     .is_err()
1075                 {
1076                     // We didn't steal this task, forget it.
1077                     return Steal::Retry;
1078                 }
1079 
1080                 // Move the front index one step forward.
1081                 f = f.wrapping_add(1);
1082 
1083                 // Repeat the same procedure for the batch steals.
1084                 //
1085                 // This loop may modify the batch_size, which triggers a clippy lint warning.
1086                 // Use a new variable to avoid the warning, and to make it clear we aren't
1087                 // modifying the loop exit condition during iteration.
1088                 let original_batch_size = batch_size;
1089                 for i in 0..original_batch_size {
1090                     // We've already got the current front index. Now execute the fence to
1091                     // synchronize with other threads.
1092                     atomic::fence(Ordering::SeqCst);
1093 
1094                     // Load the back index.
1095                     let b = self.inner.back.load(Ordering::Acquire);
1096 
1097                     // Is the queue empty?
1098                     if b.wrapping_sub(f) <= 0 {
1099                         batch_size = i;
1100                         break;
1101                     }
1102 
1103                     // Read the task at the front.
1104                     let tmp = unsafe { buffer.deref().read(f) };
1105 
1106                     // Try incrementing the front index to steal the task.
1107                     // If the buffer has been swapped or the increment fails, we retry.
1108                     if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1109                         || self
1110                             .inner
1111                             .front
1112                             .compare_exchange(
1113                                 f,
1114                                 f.wrapping_add(1),
1115                                 Ordering::SeqCst,
1116                                 Ordering::Relaxed,
1117                             )
1118                             .is_err()
1119                     {
1120                         // We didn't steal this task, forget it and break from the loop.
1121                         batch_size = i;
1122                         break;
1123                     }
1124 
1125                     // Write the previously stolen task into the destination buffer.
1126                     unsafe {
1127                         dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1128                     }
1129 
1130                     // Move the source front index and the destination back index one step forward.
1131                     f = f.wrapping_add(1);
1132                     dest_b = dest_b.wrapping_add(1);
1133                 }
1134 
1135                 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1136                 if dest.flavor == Flavor::Fifo {
1137                     for i in 0..batch_size / 2 {
1138                         unsafe {
1139                             let i1 = dest_b.wrapping_sub(batch_size - i);
1140                             let i2 = dest_b.wrapping_sub(i + 1);
1141                             let t1 = dest_buffer.read(i1);
1142                             let t2 = dest_buffer.read(i2);
1143                             dest_buffer.write(i1, t2);
1144                             dest_buffer.write(i2, t1);
1145                         }
1146                     }
1147                 }
1148             }
1149         }
1150 
1151         atomic::fence(Ordering::Release);
1152 
1153         // Update the back index in the destination queue.
1154         //
1155         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1156         // races because it doesn't understand fences.
1157         dest.inner.back.store(dest_b, Ordering::Release);
1158 
1159         // Return with success.
1160         Steal::Success(unsafe { task.assume_init() })
1161     }
1162 }
1163 
1164 impl<T> Clone for Stealer<T> {
clone(&self) -> Stealer<T>1165     fn clone(&self) -> Stealer<T> {
1166         Stealer {
1167             inner: self.inner.clone(),
1168             flavor: self.flavor,
1169         }
1170     }
1171 }
1172 
1173 impl<T> fmt::Debug for Stealer<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1174     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1175         f.pad("Stealer { .. }")
1176     }
1177 }
1178 
1179 // Bits indicating the state of a slot:
1180 // * If a task has been written into the slot, `WRITE` is set.
1181 // * If a task has been read from the slot, `READ` is set.
1182 // * If the block is being destroyed, `DESTROY` is set.
1183 const WRITE: usize = 1;
1184 const READ: usize = 2;
1185 const DESTROY: usize = 4;
1186 
1187 // Each block covers one "lap" of indices.
1188 const LAP: usize = 64;
1189 // The maximum number of values a block can hold.
1190 const BLOCK_CAP: usize = LAP - 1;
1191 // How many lower bits are reserved for metadata.
1192 const SHIFT: usize = 1;
1193 // Indicates that the block is not the last one.
1194 const HAS_NEXT: usize = 1;
1195 
1196 /// A slot in a block.
1197 struct Slot<T> {
1198     /// The task.
1199     task: UnsafeCell<MaybeUninit<T>>,
1200 
1201     /// The state of the slot.
1202     state: AtomicUsize,
1203 }
1204 
1205 impl<T> Slot<T> {
1206     const UNINIT: Self = Self {
1207         task: UnsafeCell::new(MaybeUninit::uninit()),
1208         state: AtomicUsize::new(0),
1209     };
1210 
1211     /// Waits until a task is written into the slot.
wait_write(&self)1212     fn wait_write(&self) {
1213         let backoff = Backoff::new();
1214         while self.state.load(Ordering::Acquire) & WRITE == 0 {
1215             backoff.snooze();
1216         }
1217     }
1218 }
1219 
1220 /// A block in a linked list.
1221 ///
1222 /// Each block in the list can hold up to `BLOCK_CAP` values.
1223 struct Block<T> {
1224     /// The next block in the linked list.
1225     next: AtomicPtr<Block<T>>,
1226 
1227     /// Slots for values.
1228     slots: [Slot<T>; BLOCK_CAP],
1229 }
1230 
1231 impl<T> Block<T> {
1232     /// Creates an empty block that starts at `start_index`.
new() -> Block<T>1233     fn new() -> Block<T> {
1234         Self {
1235             next: AtomicPtr::new(ptr::null_mut()),
1236             slots: [Slot::UNINIT; BLOCK_CAP],
1237         }
1238     }
1239 
1240     /// Waits until the next pointer is set.
wait_next(&self) -> *mut Block<T>1241     fn wait_next(&self) -> *mut Block<T> {
1242         let backoff = Backoff::new();
1243         loop {
1244             let next = self.next.load(Ordering::Acquire);
1245             if !next.is_null() {
1246                 return next;
1247             }
1248             backoff.snooze();
1249         }
1250     }
1251 
1252     /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
destroy(this: *mut Block<T>, count: usize)1253     unsafe fn destroy(this: *mut Block<T>, count: usize) {
1254         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1255         // begun destruction of the block.
1256         for i in (0..count).rev() {
1257             let slot = (*this).slots.get_unchecked(i);
1258 
1259             // Mark the `DESTROY` bit if a thread is still using the slot.
1260             if slot.state.load(Ordering::Acquire) & READ == 0
1261                 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1262             {
1263                 // If a thread is still using the slot, it will continue destruction of the block.
1264                 return;
1265             }
1266         }
1267 
1268         // No thread is using the block, now it is safe to destroy it.
1269         drop(Box::from_raw(this));
1270     }
1271 }
1272 
1273 /// A position in a queue.
1274 struct Position<T> {
1275     /// The index in the queue.
1276     index: AtomicUsize,
1277 
1278     /// The block in the linked list.
1279     block: AtomicPtr<Block<T>>,
1280 }
1281 
1282 /// An injector queue.
1283 ///
1284 /// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1285 /// a single injector queue, which is the entry point for new tasks.
1286 ///
1287 /// # Examples
1288 ///
1289 /// ```
1290 /// use crossbeam_deque::{Injector, Steal};
1291 ///
1292 /// let q = Injector::new();
1293 /// q.push(1);
1294 /// q.push(2);
1295 ///
1296 /// assert_eq!(q.steal(), Steal::Success(1));
1297 /// assert_eq!(q.steal(), Steal::Success(2));
1298 /// assert_eq!(q.steal(), Steal::Empty);
1299 /// ```
1300 pub struct Injector<T> {
1301     /// The head of the queue.
1302     head: CachePadded<Position<T>>,
1303 
1304     /// The tail of the queue.
1305     tail: CachePadded<Position<T>>,
1306 
1307     /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1308     _marker: PhantomData<T>,
1309 }
1310 
1311 unsafe impl<T: Send> Send for Injector<T> {}
1312 unsafe impl<T: Send> Sync for Injector<T> {}
1313 
1314 impl<T> Default for Injector<T> {
default() -> Self1315     fn default() -> Self {
1316         let block = Box::into_raw(Box::new(Block::<T>::new()));
1317         Self {
1318             head: CachePadded::new(Position {
1319                 block: AtomicPtr::new(block),
1320                 index: AtomicUsize::new(0),
1321             }),
1322             tail: CachePadded::new(Position {
1323                 block: AtomicPtr::new(block),
1324                 index: AtomicUsize::new(0),
1325             }),
1326             _marker: PhantomData,
1327         }
1328     }
1329 }
1330 
1331 impl<T> Injector<T> {
1332     /// Creates a new injector queue.
1333     ///
1334     /// # Examples
1335     ///
1336     /// ```
1337     /// use crossbeam_deque::Injector;
1338     ///
1339     /// let q = Injector::<i32>::new();
1340     /// ```
new() -> Injector<T>1341     pub fn new() -> Injector<T> {
1342         Self::default()
1343     }
1344 
1345     /// Pushes a task into the queue.
1346     ///
1347     /// # Examples
1348     ///
1349     /// ```
1350     /// use crossbeam_deque::Injector;
1351     ///
1352     /// let w = Injector::new();
1353     /// w.push(1);
1354     /// w.push(2);
1355     /// ```
push(&self, task: T)1356     pub fn push(&self, task: T) {
1357         let backoff = Backoff::new();
1358         let mut tail = self.tail.index.load(Ordering::Acquire);
1359         let mut block = self.tail.block.load(Ordering::Acquire);
1360         let mut next_block = None;
1361 
1362         loop {
1363             // Calculate the offset of the index into the block.
1364             let offset = (tail >> SHIFT) % LAP;
1365 
1366             // If we reached the end of the block, wait until the next one is installed.
1367             if offset == BLOCK_CAP {
1368                 backoff.snooze();
1369                 tail = self.tail.index.load(Ordering::Acquire);
1370                 block = self.tail.block.load(Ordering::Acquire);
1371                 continue;
1372             }
1373 
1374             // If we're going to have to install the next block, allocate it in advance in order to
1375             // make the wait for other threads as short as possible.
1376             if offset + 1 == BLOCK_CAP && next_block.is_none() {
1377                 next_block = Some(Box::new(Block::<T>::new()));
1378             }
1379 
1380             let new_tail = tail + (1 << SHIFT);
1381 
1382             // Try advancing the tail forward.
1383             match self.tail.index.compare_exchange_weak(
1384                 tail,
1385                 new_tail,
1386                 Ordering::SeqCst,
1387                 Ordering::Acquire,
1388             ) {
1389                 Ok(_) => unsafe {
1390                     // If we've reached the end of the block, install the next one.
1391                     if offset + 1 == BLOCK_CAP {
1392                         let next_block = Box::into_raw(next_block.unwrap());
1393                         let next_index = new_tail.wrapping_add(1 << SHIFT);
1394 
1395                         self.tail.block.store(next_block, Ordering::Release);
1396                         self.tail.index.store(next_index, Ordering::Release);
1397                         (*block).next.store(next_block, Ordering::Release);
1398                     }
1399 
1400                     // Write the task into the slot.
1401                     let slot = (*block).slots.get_unchecked(offset);
1402                     slot.task.get().write(MaybeUninit::new(task));
1403                     slot.state.fetch_or(WRITE, Ordering::Release);
1404 
1405                     return;
1406                 },
1407                 Err(t) => {
1408                     tail = t;
1409                     block = self.tail.block.load(Ordering::Acquire);
1410                     backoff.spin();
1411                 }
1412             }
1413         }
1414     }
1415 
1416     /// Steals a task from the queue.
1417     ///
1418     /// # Examples
1419     ///
1420     /// ```
1421     /// use crossbeam_deque::{Injector, Steal};
1422     ///
1423     /// let q = Injector::new();
1424     /// q.push(1);
1425     /// q.push(2);
1426     ///
1427     /// assert_eq!(q.steal(), Steal::Success(1));
1428     /// assert_eq!(q.steal(), Steal::Success(2));
1429     /// assert_eq!(q.steal(), Steal::Empty);
1430     /// ```
steal(&self) -> Steal<T>1431     pub fn steal(&self) -> Steal<T> {
1432         let mut head;
1433         let mut block;
1434         let mut offset;
1435 
1436         let backoff = Backoff::new();
1437         loop {
1438             head = self.head.index.load(Ordering::Acquire);
1439             block = self.head.block.load(Ordering::Acquire);
1440 
1441             // Calculate the offset of the index into the block.
1442             offset = (head >> SHIFT) % LAP;
1443 
1444             // If we reached the end of the block, wait until the next one is installed.
1445             if offset == BLOCK_CAP {
1446                 backoff.snooze();
1447             } else {
1448                 break;
1449             }
1450         }
1451 
1452         let mut new_head = head + (1 << SHIFT);
1453 
1454         if new_head & HAS_NEXT == 0 {
1455             atomic::fence(Ordering::SeqCst);
1456             let tail = self.tail.index.load(Ordering::Relaxed);
1457 
1458             // If the tail equals the head, that means the queue is empty.
1459             if head >> SHIFT == tail >> SHIFT {
1460                 return Steal::Empty;
1461             }
1462 
1463             // If head and tail are not in the same block, set `HAS_NEXT` in head.
1464             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1465                 new_head |= HAS_NEXT;
1466             }
1467         }
1468 
1469         // Try moving the head index forward.
1470         if self
1471             .head
1472             .index
1473             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1474             .is_err()
1475         {
1476             return Steal::Retry;
1477         }
1478 
1479         unsafe {
1480             // If we've reached the end of the block, move to the next one.
1481             if offset + 1 == BLOCK_CAP {
1482                 let next = (*block).wait_next();
1483                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1484                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1485                     next_index |= HAS_NEXT;
1486                 }
1487 
1488                 self.head.block.store(next, Ordering::Release);
1489                 self.head.index.store(next_index, Ordering::Release);
1490             }
1491 
1492             // Read the task.
1493             let slot = (*block).slots.get_unchecked(offset);
1494             slot.wait_write();
1495             let task = slot.task.get().read().assume_init();
1496 
1497             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1498             // but couldn't because we were busy reading from the slot.
1499             if (offset + 1 == BLOCK_CAP)
1500                 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1501             {
1502                 Block::destroy(block, offset);
1503             }
1504 
1505             Steal::Success(task)
1506         }
1507     }
1508 
1509     /// Steals a batch of tasks and pushes them into a worker.
1510     ///
1511     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1512     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1513     ///
1514     /// # Examples
1515     ///
1516     /// ```
1517     /// use crossbeam_deque::{Injector, Worker};
1518     ///
1519     /// let q = Injector::new();
1520     /// q.push(1);
1521     /// q.push(2);
1522     /// q.push(3);
1523     /// q.push(4);
1524     ///
1525     /// let w = Worker::new_fifo();
1526     /// let _ = q.steal_batch(&w);
1527     /// assert_eq!(w.pop(), Some(1));
1528     /// assert_eq!(w.pop(), Some(2));
1529     /// ```
steal_batch(&self, dest: &Worker<T>) -> Steal<()>1530     pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1531         self.steal_batch_with_limit(dest, MAX_BATCH)
1532     }
1533 
1534     /// Steals no more than of tasks and pushes them into a worker.
1535     ///
1536     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1537     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1538     ///
1539     /// # Examples
1540     ///
1541     /// ```
1542     /// use crossbeam_deque::{Injector, Worker};
1543     ///
1544     /// let q = Injector::new();
1545     /// q.push(1);
1546     /// q.push(2);
1547     /// q.push(3);
1548     /// q.push(4);
1549     /// q.push(5);
1550     /// q.push(6);
1551     ///
1552     /// let w = Worker::new_fifo();
1553     /// let _ = q.steal_batch_with_limit(&w, 2);
1554     /// assert_eq!(w.pop(), Some(1));
1555     /// assert_eq!(w.pop(), Some(2));
1556     /// assert_eq!(w.pop(), None);
1557     ///
1558     /// q.push(7);
1559     /// q.push(8);
1560     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1561     /// // half of the elements are currently popped, but the number of popped elements is considered
1562     /// // an implementation detail that may be changed in the future.
1563     /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1564     /// assert_eq!(w.len(), 3);
1565     /// ```
steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()>1566     pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1567         assert!(limit > 0);
1568         let mut head;
1569         let mut block;
1570         let mut offset;
1571 
1572         let backoff = Backoff::new();
1573         loop {
1574             head = self.head.index.load(Ordering::Acquire);
1575             block = self.head.block.load(Ordering::Acquire);
1576 
1577             // Calculate the offset of the index into the block.
1578             offset = (head >> SHIFT) % LAP;
1579 
1580             // If we reached the end of the block, wait until the next one is installed.
1581             if offset == BLOCK_CAP {
1582                 backoff.snooze();
1583             } else {
1584                 break;
1585             }
1586         }
1587 
1588         let mut new_head = head;
1589         let advance;
1590 
1591         if new_head & HAS_NEXT == 0 {
1592             atomic::fence(Ordering::SeqCst);
1593             let tail = self.tail.index.load(Ordering::Relaxed);
1594 
1595             // If the tail equals the head, that means the queue is empty.
1596             if head >> SHIFT == tail >> SHIFT {
1597                 return Steal::Empty;
1598             }
1599 
1600             // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1601             // the right batch size to steal.
1602             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1603                 new_head |= HAS_NEXT;
1604                 // We can steal all tasks till the end of the block.
1605                 advance = (BLOCK_CAP - offset).min(limit);
1606             } else {
1607                 let len = (tail - head) >> SHIFT;
1608                 // Steal half of the available tasks.
1609                 advance = ((len + 1) / 2).min(limit);
1610             }
1611         } else {
1612             // We can steal all tasks till the end of the block.
1613             advance = (BLOCK_CAP - offset).min(limit);
1614         }
1615 
1616         new_head += advance << SHIFT;
1617         let new_offset = offset + advance;
1618 
1619         // Try moving the head index forward.
1620         if self
1621             .head
1622             .index
1623             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1624             .is_err()
1625         {
1626             return Steal::Retry;
1627         }
1628 
1629         // Reserve capacity for the stolen batch.
1630         let batch_size = new_offset - offset;
1631         dest.reserve(batch_size);
1632 
1633         // Get the destination buffer and back index.
1634         let dest_buffer = dest.buffer.get();
1635         let dest_b = dest.inner.back.load(Ordering::Relaxed);
1636 
1637         unsafe {
1638             // If we've reached the end of the block, move to the next one.
1639             if new_offset == BLOCK_CAP {
1640                 let next = (*block).wait_next();
1641                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1642                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1643                     next_index |= HAS_NEXT;
1644                 }
1645 
1646                 self.head.block.store(next, Ordering::Release);
1647                 self.head.index.store(next_index, Ordering::Release);
1648             }
1649 
1650             // Copy values from the injector into the destination queue.
1651             match dest.flavor {
1652                 Flavor::Fifo => {
1653                     for i in 0..batch_size {
1654                         // Read the task.
1655                         let slot = (*block).slots.get_unchecked(offset + i);
1656                         slot.wait_write();
1657                         let task = slot.task.get().read();
1658 
1659                         // Write it into the destination queue.
1660                         dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1661                     }
1662                 }
1663 
1664                 Flavor::Lifo => {
1665                     for i in 0..batch_size {
1666                         // Read the task.
1667                         let slot = (*block).slots.get_unchecked(offset + i);
1668                         slot.wait_write();
1669                         let task = slot.task.get().read();
1670 
1671                         // Write it into the destination queue.
1672                         dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1673                     }
1674                 }
1675             }
1676 
1677             atomic::fence(Ordering::Release);
1678 
1679             // Update the back index in the destination queue.
1680             //
1681             // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1682             // data races because it doesn't understand fences.
1683             dest.inner
1684                 .back
1685                 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1686 
1687             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1688             // but couldn't because we were busy reading from the slot.
1689             if new_offset == BLOCK_CAP {
1690                 Block::destroy(block, offset);
1691             } else {
1692                 for i in offset..new_offset {
1693                     let slot = (*block).slots.get_unchecked(i);
1694 
1695                     if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1696                         Block::destroy(block, offset);
1697                         break;
1698                     }
1699                 }
1700             }
1701 
1702             Steal::Success(())
1703         }
1704     }
1705 
1706     /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1707     ///
1708     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1709     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1710     ///
1711     /// # Examples
1712     ///
1713     /// ```
1714     /// use crossbeam_deque::{Injector, Steal, Worker};
1715     ///
1716     /// let q = Injector::new();
1717     /// q.push(1);
1718     /// q.push(2);
1719     /// q.push(3);
1720     /// q.push(4);
1721     ///
1722     /// let w = Worker::new_fifo();
1723     /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1724     /// assert_eq!(w.pop(), Some(2));
1725     /// ```
steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>1726     pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1727         // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1728         // better, but we may change it in the future to be compatible with the same method in Stealer.
1729         self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1730     }
1731 
1732     /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1733     ///
1734     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1735     /// steal around half of the tasks in the queue, but also not more than the given limit.
1736     ///
1737     /// # Examples
1738     ///
1739     /// ```
1740     /// use crossbeam_deque::{Injector, Steal, Worker};
1741     ///
1742     /// let q = Injector::new();
1743     /// q.push(1);
1744     /// q.push(2);
1745     /// q.push(3);
1746     /// q.push(4);
1747     /// q.push(5);
1748     /// q.push(6);
1749     ///
1750     /// let w = Worker::new_fifo();
1751     /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1752     /// assert_eq!(w.pop(), Some(2));
1753     /// assert_eq!(w.pop(), None);
1754     ///
1755     /// q.push(7);
1756     /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1757     /// // half of the elements are currently popped, but the number of popped elements is considered
1758     /// // an implementation detail that may be changed in the future.
1759     /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1760     /// assert_eq!(w.pop(), Some(4));
1761     /// assert_eq!(w.pop(), Some(5));
1762     /// assert_eq!(w.pop(), None);
1763     /// ```
steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T>1764     pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1765         assert!(limit > 0);
1766         let mut head;
1767         let mut block;
1768         let mut offset;
1769 
1770         let backoff = Backoff::new();
1771         loop {
1772             head = self.head.index.load(Ordering::Acquire);
1773             block = self.head.block.load(Ordering::Acquire);
1774 
1775             // Calculate the offset of the index into the block.
1776             offset = (head >> SHIFT) % LAP;
1777 
1778             // If we reached the end of the block, wait until the next one is installed.
1779             if offset == BLOCK_CAP {
1780                 backoff.snooze();
1781             } else {
1782                 break;
1783             }
1784         }
1785 
1786         let mut new_head = head;
1787         let advance;
1788 
1789         if new_head & HAS_NEXT == 0 {
1790             atomic::fence(Ordering::SeqCst);
1791             let tail = self.tail.index.load(Ordering::Relaxed);
1792 
1793             // If the tail equals the head, that means the queue is empty.
1794             if head >> SHIFT == tail >> SHIFT {
1795                 return Steal::Empty;
1796             }
1797 
1798             // If head and tail are not in the same block, set `HAS_NEXT` in head.
1799             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1800                 new_head |= HAS_NEXT;
1801                 // We can steal all tasks till the end of the block.
1802                 advance = (BLOCK_CAP - offset).min(limit);
1803             } else {
1804                 let len = (tail - head) >> SHIFT;
1805                 // Steal half of the available tasks.
1806                 advance = ((len + 1) / 2).min(limit);
1807             }
1808         } else {
1809             // We can steal all tasks till the end of the block.
1810             advance = (BLOCK_CAP - offset).min(limit);
1811         }
1812 
1813         new_head += advance << SHIFT;
1814         let new_offset = offset + advance;
1815 
1816         // Try moving the head index forward.
1817         if self
1818             .head
1819             .index
1820             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1821             .is_err()
1822         {
1823             return Steal::Retry;
1824         }
1825 
1826         // Reserve capacity for the stolen batch.
1827         let batch_size = new_offset - offset - 1;
1828         dest.reserve(batch_size);
1829 
1830         // Get the destination buffer and back index.
1831         let dest_buffer = dest.buffer.get();
1832         let dest_b = dest.inner.back.load(Ordering::Relaxed);
1833 
1834         unsafe {
1835             // If we've reached the end of the block, move to the next one.
1836             if new_offset == BLOCK_CAP {
1837                 let next = (*block).wait_next();
1838                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1839                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1840                     next_index |= HAS_NEXT;
1841                 }
1842 
1843                 self.head.block.store(next, Ordering::Release);
1844                 self.head.index.store(next_index, Ordering::Release);
1845             }
1846 
1847             // Read the task.
1848             let slot = (*block).slots.get_unchecked(offset);
1849             slot.wait_write();
1850             let task = slot.task.get().read();
1851 
1852             match dest.flavor {
1853                 Flavor::Fifo => {
1854                     // Copy values from the injector into the destination queue.
1855                     for i in 0..batch_size {
1856                         // Read the task.
1857                         let slot = (*block).slots.get_unchecked(offset + i + 1);
1858                         slot.wait_write();
1859                         let task = slot.task.get().read();
1860 
1861                         // Write it into the destination queue.
1862                         dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1863                     }
1864                 }
1865 
1866                 Flavor::Lifo => {
1867                     // Copy values from the injector into the destination queue.
1868                     for i in 0..batch_size {
1869                         // Read the task.
1870                         let slot = (*block).slots.get_unchecked(offset + i + 1);
1871                         slot.wait_write();
1872                         let task = slot.task.get().read();
1873 
1874                         // Write it into the destination queue.
1875                         dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1876                     }
1877                 }
1878             }
1879 
1880             atomic::fence(Ordering::Release);
1881 
1882             // Update the back index in the destination queue.
1883             //
1884             // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1885             // data races because it doesn't understand fences.
1886             dest.inner
1887                 .back
1888                 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1889 
1890             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1891             // but couldn't because we were busy reading from the slot.
1892             if new_offset == BLOCK_CAP {
1893                 Block::destroy(block, offset);
1894             } else {
1895                 for i in offset..new_offset {
1896                     let slot = (*block).slots.get_unchecked(i);
1897 
1898                     if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1899                         Block::destroy(block, offset);
1900                         break;
1901                     }
1902                 }
1903             }
1904 
1905             Steal::Success(task.assume_init())
1906         }
1907     }
1908 
1909     /// Returns `true` if the queue is empty.
1910     ///
1911     /// # Examples
1912     ///
1913     /// ```
1914     /// use crossbeam_deque::Injector;
1915     ///
1916     /// let q = Injector::new();
1917     ///
1918     /// assert!(q.is_empty());
1919     /// q.push(1);
1920     /// assert!(!q.is_empty());
1921     /// ```
is_empty(&self) -> bool1922     pub fn is_empty(&self) -> bool {
1923         let head = self.head.index.load(Ordering::SeqCst);
1924         let tail = self.tail.index.load(Ordering::SeqCst);
1925         head >> SHIFT == tail >> SHIFT
1926     }
1927 
1928     /// Returns the number of tasks in the queue.
1929     ///
1930     /// # Examples
1931     ///
1932     /// ```
1933     /// use crossbeam_deque::Injector;
1934     ///
1935     /// let q = Injector::new();
1936     ///
1937     /// assert_eq!(q.len(), 0);
1938     /// q.push(1);
1939     /// assert_eq!(q.len(), 1);
1940     /// q.push(1);
1941     /// assert_eq!(q.len(), 2);
1942     /// ```
len(&self) -> usize1943     pub fn len(&self) -> usize {
1944         loop {
1945             // Load the tail index, then load the head index.
1946             let mut tail = self.tail.index.load(Ordering::SeqCst);
1947             let mut head = self.head.index.load(Ordering::SeqCst);
1948 
1949             // If the tail index didn't change, we've got consistent indices to work with.
1950             if self.tail.index.load(Ordering::SeqCst) == tail {
1951                 // Erase the lower bits.
1952                 tail &= !((1 << SHIFT) - 1);
1953                 head &= !((1 << SHIFT) - 1);
1954 
1955                 // Fix up indices if they fall onto block ends.
1956                 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1957                     tail = tail.wrapping_add(1 << SHIFT);
1958                 }
1959                 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1960                     head = head.wrapping_add(1 << SHIFT);
1961                 }
1962 
1963                 // Rotate indices so that head falls into the first block.
1964                 let lap = (head >> SHIFT) / LAP;
1965                 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1966                 head = head.wrapping_sub((lap * LAP) << SHIFT);
1967 
1968                 // Remove the lower bits.
1969                 tail >>= SHIFT;
1970                 head >>= SHIFT;
1971 
1972                 // Return the difference minus the number of blocks between tail and head.
1973                 return tail - head - tail / LAP;
1974             }
1975         }
1976     }
1977 }
1978 
1979 impl<T> Drop for Injector<T> {
drop(&mut self)1980     fn drop(&mut self) {
1981         let mut head = *self.head.index.get_mut();
1982         let mut tail = *self.tail.index.get_mut();
1983         let mut block = *self.head.block.get_mut();
1984 
1985         // Erase the lower bits.
1986         head &= !((1 << SHIFT) - 1);
1987         tail &= !((1 << SHIFT) - 1);
1988 
1989         unsafe {
1990             // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1991             while head != tail {
1992                 let offset = (head >> SHIFT) % LAP;
1993 
1994                 if offset < BLOCK_CAP {
1995                     // Drop the task in the slot.
1996                     let slot = (*block).slots.get_unchecked(offset);
1997                     (*slot.task.get()).assume_init_drop();
1998                 } else {
1999                     // Deallocate the block and move to the next one.
2000                     let next = *(*block).next.get_mut();
2001                     drop(Box::from_raw(block));
2002                     block = next;
2003                 }
2004 
2005                 head = head.wrapping_add(1 << SHIFT);
2006             }
2007 
2008             // Deallocate the last remaining block.
2009             drop(Box::from_raw(block));
2010         }
2011     }
2012 }
2013 
2014 impl<T> fmt::Debug for Injector<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2015     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2016         f.pad("Worker { .. }")
2017     }
2018 }
2019 
2020 /// Possible outcomes of a steal operation.
2021 ///
2022 /// # Examples
2023 ///
2024 /// There are lots of ways to chain results of steal operations together:
2025 ///
2026 /// ```
2027 /// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2028 ///
2029 /// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2030 ///
2031 /// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2032 /// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2033 /// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2034 ///
2035 /// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2036 /// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2037 /// ```
2038 #[must_use]
2039 #[derive(PartialEq, Eq, Copy, Clone)]
2040 pub enum Steal<T> {
2041     /// The queue was empty at the time of stealing.
2042     Empty,
2043 
2044     /// At least one task was successfully stolen.
2045     Success(T),
2046 
2047     /// The steal operation needs to be retried.
2048     Retry,
2049 }
2050 
2051 impl<T> Steal<T> {
2052     /// Returns `true` if the queue was empty at the time of stealing.
2053     ///
2054     /// # Examples
2055     ///
2056     /// ```
2057     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2058     ///
2059     /// assert!(!Success(7).is_empty());
2060     /// assert!(!Retry::<i32>.is_empty());
2061     ///
2062     /// assert!(Empty::<i32>.is_empty());
2063     /// ```
is_empty(&self) -> bool2064     pub fn is_empty(&self) -> bool {
2065         match self {
2066             Steal::Empty => true,
2067             _ => false,
2068         }
2069     }
2070 
2071     /// Returns `true` if at least one task was stolen.
2072     ///
2073     /// # Examples
2074     ///
2075     /// ```
2076     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2077     ///
2078     /// assert!(!Empty::<i32>.is_success());
2079     /// assert!(!Retry::<i32>.is_success());
2080     ///
2081     /// assert!(Success(7).is_success());
2082     /// ```
is_success(&self) -> bool2083     pub fn is_success(&self) -> bool {
2084         match self {
2085             Steal::Success(_) => true,
2086             _ => false,
2087         }
2088     }
2089 
2090     /// Returns `true` if the steal operation needs to be retried.
2091     ///
2092     /// # Examples
2093     ///
2094     /// ```
2095     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2096     ///
2097     /// assert!(!Empty::<i32>.is_retry());
2098     /// assert!(!Success(7).is_retry());
2099     ///
2100     /// assert!(Retry::<i32>.is_retry());
2101     /// ```
is_retry(&self) -> bool2102     pub fn is_retry(&self) -> bool {
2103         match self {
2104             Steal::Retry => true,
2105             _ => false,
2106         }
2107     }
2108 
2109     /// Returns the result of the operation, if successful.
2110     ///
2111     /// # Examples
2112     ///
2113     /// ```
2114     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2115     ///
2116     /// assert_eq!(Empty::<i32>.success(), None);
2117     /// assert_eq!(Retry::<i32>.success(), None);
2118     ///
2119     /// assert_eq!(Success(7).success(), Some(7));
2120     /// ```
success(self) -> Option<T>2121     pub fn success(self) -> Option<T> {
2122         match self {
2123             Steal::Success(res) => Some(res),
2124             _ => None,
2125         }
2126     }
2127 
2128     /// If no task was stolen, attempts another steal operation.
2129     ///
2130     /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2131     ///
2132     /// * If the second steal resulted in `Success`, it is returned.
2133     /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2134     /// * If both resulted in `None`, then `None` is returned.
2135     ///
2136     /// # Examples
2137     ///
2138     /// ```
2139     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2140     ///
2141     /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2142     /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2143     ///
2144     /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2145     /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2146     ///
2147     /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2148     /// ```
or_else<F>(self, f: F) -> Steal<T> where F: FnOnce() -> Steal<T>,2149     pub fn or_else<F>(self, f: F) -> Steal<T>
2150     where
2151         F: FnOnce() -> Steal<T>,
2152     {
2153         match self {
2154             Steal::Empty => f(),
2155             Steal::Success(_) => self,
2156             Steal::Retry => {
2157                 if let Steal::Success(res) = f() {
2158                     Steal::Success(res)
2159                 } else {
2160                     Steal::Retry
2161                 }
2162             }
2163         }
2164     }
2165 }
2166 
2167 impl<T> fmt::Debug for Steal<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result2168     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2169         match self {
2170             Steal::Empty => f.pad("Empty"),
2171             Steal::Success(_) => f.pad("Success(..)"),
2172             Steal::Retry => f.pad("Retry"),
2173         }
2174     }
2175 }
2176 
2177 impl<T> FromIterator<Steal<T>> for Steal<T> {
2178     /// Consumes items until a `Success` is found and returns it.
2179     ///
2180     /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2181     /// Otherwise, `Empty` is returned.
from_iter<I>(iter: I) -> Steal<T> where I: IntoIterator<Item = Steal<T>>,2182     fn from_iter<I>(iter: I) -> Steal<T>
2183     where
2184         I: IntoIterator<Item = Steal<T>>,
2185     {
2186         let mut retry = false;
2187         for s in iter {
2188             match &s {
2189                 Steal::Empty => {}
2190                 Steal::Success(_) => return s,
2191                 Steal::Retry => retry = true,
2192             }
2193         }
2194 
2195         if retry {
2196             Steal::Retry
2197         } else {
2198             Steal::Empty
2199         }
2200     }
2201 }
2202