1 use super::{Pop, Synced};
2 
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::runtime::task;
5 
6 use std::marker::PhantomData;
7 use std::sync::atomic::Ordering::{Acquire, Release};
8 
9 pub(crate) struct Shared<T: 'static> {
10     /// Number of pending tasks in the queue. This helps prevent unnecessary
11     /// locking in the hot path.
12     pub(super) len: AtomicUsize,
13 
14     _p: PhantomData<T>,
15 }
16 
17 unsafe impl<T> Send for Shared<T> {}
18 unsafe impl<T> Sync for Shared<T> {}
19 
20 impl<T: 'static> Shared<T> {
new() -> (Shared<T>, Synced)21     pub(crate) fn new() -> (Shared<T>, Synced) {
22         let inject = Shared {
23             len: AtomicUsize::new(0),
24             _p: PhantomData,
25         };
26 
27         let synced = Synced {
28             is_closed: false,
29             head: None,
30             tail: None,
31         };
32 
33         (inject, synced)
34     }
35 
is_empty(&self) -> bool36     pub(crate) fn is_empty(&self) -> bool {
37         self.len() == 0
38     }
39 
40     // Kind of annoying to have to include the cfg here
41     #[cfg(any(tokio_taskdump, feature = "rt-multi-thread"))]
is_closed(&self, synced: &Synced) -> bool42     pub(crate) fn is_closed(&self, synced: &Synced) -> bool {
43         synced.is_closed
44     }
45 
46     /// Closes the injection queue, returns `true` if the queue is open when the
47     /// transition is made.
close(&self, synced: &mut Synced) -> bool48     pub(crate) fn close(&self, synced: &mut Synced) -> bool {
49         if synced.is_closed {
50             return false;
51         }
52 
53         synced.is_closed = true;
54         true
55     }
56 
len(&self) -> usize57     pub(crate) fn len(&self) -> usize {
58         self.len.load(Acquire)
59     }
60 
61     /// Pushes a value into the queue.
62     ///
63     /// This does nothing if the queue is closed.
64     ///
65     /// # Safety
66     ///
67     /// Must be called with the same `Synced` instance returned by `Inject::new`
push(&self, synced: &mut Synced, task: task::Notified<T>)68     pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) {
69         if synced.is_closed {
70             return;
71         }
72 
73         // safety: only mutated with the lock held
74         let len = self.len.unsync_load();
75         let task = task.into_raw();
76 
77         // The next pointer should already be null
78         debug_assert!(unsafe { task.get_queue_next().is_none() });
79 
80         if let Some(tail) = synced.tail {
81             // safety: Holding the Notified for a task guarantees exclusive
82             // access to the `queue_next` field.
83             unsafe { tail.set_queue_next(Some(task)) };
84         } else {
85             synced.head = Some(task);
86         }
87 
88         synced.tail = Some(task);
89         self.len.store(len + 1, Release);
90     }
91 
92     /// Pop a value from the queue.
93     ///
94     /// # Safety
95     ///
96     /// Must be called with the same `Synced` instance returned by `Inject::new`
pop(&self, synced: &mut Synced) -> Option<task::Notified<T>>97     pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> {
98         self.pop_n(synced, 1).next()
99     }
100 
101     /// Pop `n` values from the queue
102     ///
103     /// # Safety
104     ///
105     /// Must be called with the same `Synced` instance returned by `Inject::new`
pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T>106     pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
107         use std::cmp;
108 
109         debug_assert!(n > 0);
110 
111         // safety: All updates to the len atomic are guarded by the mutex. As
112         // such, a non-atomic load followed by a store is safe.
113         let len = self.len.unsync_load();
114         let n = cmp::min(n, len);
115 
116         // Decrement the count.
117         self.len.store(len - n, Release);
118 
119         Pop::new(n, synced)
120     }
121 }
122