1 use super::{Pop, Synced}; 2 3 use crate::loom::sync::atomic::AtomicUsize; 4 use crate::runtime::task; 5 6 use std::marker::PhantomData; 7 use std::sync::atomic::Ordering::{Acquire, Release}; 8 9 pub(crate) struct Shared<T: 'static> { 10 /// Number of pending tasks in the queue. This helps prevent unnecessary 11 /// locking in the hot path. 12 pub(super) len: AtomicUsize, 13 14 _p: PhantomData<T>, 15 } 16 17 unsafe impl<T> Send for Shared<T> {} 18 unsafe impl<T> Sync for Shared<T> {} 19 20 impl<T: 'static> Shared<T> { new() -> (Shared<T>, Synced)21 pub(crate) fn new() -> (Shared<T>, Synced) { 22 let inject = Shared { 23 len: AtomicUsize::new(0), 24 _p: PhantomData, 25 }; 26 27 let synced = Synced { 28 is_closed: false, 29 head: None, 30 tail: None, 31 }; 32 33 (inject, synced) 34 } 35 is_empty(&self) -> bool36 pub(crate) fn is_empty(&self) -> bool { 37 self.len() == 0 38 } 39 40 // Kind of annoying to have to include the cfg here 41 #[cfg(any(tokio_taskdump, feature = "rt-multi-thread"))] is_closed(&self, synced: &Synced) -> bool42 pub(crate) fn is_closed(&self, synced: &Synced) -> bool { 43 synced.is_closed 44 } 45 46 /// Closes the injection queue, returns `true` if the queue is open when the 47 /// transition is made. close(&self, synced: &mut Synced) -> bool48 pub(crate) fn close(&self, synced: &mut Synced) -> bool { 49 if synced.is_closed { 50 return false; 51 } 52 53 synced.is_closed = true; 54 true 55 } 56 len(&self) -> usize57 pub(crate) fn len(&self) -> usize { 58 self.len.load(Acquire) 59 } 60 61 /// Pushes a value into the queue. 62 /// 63 /// This does nothing if the queue is closed. 64 /// 65 /// # Safety 66 /// 67 /// Must be called with the same `Synced` instance returned by `Inject::new` push(&self, synced: &mut Synced, task: task::Notified<T>)68 pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) { 69 if synced.is_closed { 70 return; 71 } 72 73 // safety: only mutated with the lock held 74 let len = self.len.unsync_load(); 75 let task = task.into_raw(); 76 77 // The next pointer should already be null 78 debug_assert!(unsafe { task.get_queue_next().is_none() }); 79 80 if let Some(tail) = synced.tail { 81 // safety: Holding the Notified for a task guarantees exclusive 82 // access to the `queue_next` field. 83 unsafe { tail.set_queue_next(Some(task)) }; 84 } else { 85 synced.head = Some(task); 86 } 87 88 synced.tail = Some(task); 89 self.len.store(len + 1, Release); 90 } 91 92 /// Pop a value from the queue. 93 /// 94 /// # Safety 95 /// 96 /// Must be called with the same `Synced` instance returned by `Inject::new` pop(&self, synced: &mut Synced) -> Option<task::Notified<T>>97 pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> { 98 self.pop_n(synced, 1).next() 99 } 100 101 /// Pop `n` values from the queue 102 /// 103 /// # Safety 104 /// 105 /// Must be called with the same `Synced` instance returned by `Inject::new` pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T>106 pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> { 107 use std::cmp; 108 109 debug_assert!(n > 0); 110 111 // safety: All updates to the len atomic are guarded by the mutex. As 112 // such, a non-atomic load followed by a store is safe. 113 let len = self.len.unsync_load(); 114 let n = cmp::min(n, len); 115 116 // Decrement the count. 117 self.len.store(len - n, Release); 118 119 Pop::new(n, synced) 120 } 121 } 122