1 use crate::loom::sync::atomic::AtomicUsize;
2 
3 use std::fmt;
4 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5 
6 pub(super) struct State {
7     val: AtomicUsize,
8 }
9 
10 /// Current state value.
11 #[derive(Copy, Clone)]
12 pub(super) struct Snapshot(usize);
13 
14 type UpdateResult = Result<Snapshot, Snapshot>;
15 
16 /// The task is currently being run.
17 const RUNNING: usize = 0b0001;
18 
19 /// The task is complete.
20 ///
21 /// Once this bit is set, it is never unset.
22 const COMPLETE: usize = 0b0010;
23 
24 /// Extracts the task's lifecycle value from the state.
25 const LIFECYCLE_MASK: usize = 0b11;
26 
27 /// Flag tracking if the task has been pushed into a run queue.
28 const NOTIFIED: usize = 0b100;
29 
30 /// The join handle is still around.
31 const JOIN_INTEREST: usize = 0b1_000;
32 
33 /// A join handle waker has been set.
34 const JOIN_WAKER: usize = 0b10_000;
35 
36 /// The task has been forcibly cancelled.
37 const CANCELLED: usize = 0b100_000;
38 
39 /// All bits.
40 const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
41 
42 /// Bits used by the ref count portion of the state.
43 const REF_COUNT_MASK: usize = !STATE_MASK;
44 
45 /// Number of positions to shift the ref count.
46 const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
47 
48 /// One ref count.
49 const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
50 
51 /// State a task is initialized with.
52 ///
53 /// A task is initialized with three references:
54 ///
55 ///  * A reference that will be stored in an `OwnedTasks` or `LocalOwnedTasks`.
56 ///  * A reference that will be sent to the scheduler as an ordinary notification.
57 ///  * A reference for the `JoinHandle`.
58 ///
59 /// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set.
60 /// As the task starts with a `Notified`, `NOTIFIED` is set.
61 const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED;
62 
63 #[must_use]
64 pub(super) enum TransitionToRunning {
65     Success,
66     Cancelled,
67     Failed,
68     Dealloc,
69 }
70 
71 #[must_use]
72 pub(super) enum TransitionToIdle {
73     Ok,
74     OkNotified,
75     OkDealloc,
76     Cancelled,
77 }
78 
79 #[must_use]
80 pub(super) enum TransitionToNotifiedByVal {
81     DoNothing,
82     Submit,
83     Dealloc,
84 }
85 
86 #[must_use]
87 pub(crate) enum TransitionToNotifiedByRef {
88     DoNothing,
89     Submit,
90 }
91 
92 /// All transitions are performed via RMW operations. This establishes an
93 /// unambiguous modification order.
94 impl State {
95     /// Returns a task's initial state.
new() -> State96     pub(super) fn new() -> State {
97         // The raw task returned by this method has a ref-count of three. See
98         // the comment on INITIAL_STATE for more.
99         State {
100             val: AtomicUsize::new(INITIAL_STATE),
101         }
102     }
103 
104     /// Loads the current state, establishes `Acquire` ordering.
load(&self) -> Snapshot105     pub(super) fn load(&self) -> Snapshot {
106         Snapshot(self.val.load(Acquire))
107     }
108 
109     /// Attempts to transition the lifecycle to `Running`. This sets the
110     /// notified bit to false so notifications during the poll can be detected.
transition_to_running(&self) -> TransitionToRunning111     pub(super) fn transition_to_running(&self) -> TransitionToRunning {
112         self.fetch_update_action(|mut next| {
113             let action;
114             assert!(next.is_notified());
115 
116             if !next.is_idle() {
117                 // This happens if the task is either currently running or if it
118                 // has already completed, e.g. if it was cancelled during
119                 // shutdown. Consume the ref-count and return.
120                 next.ref_dec();
121                 if next.ref_count() == 0 {
122                     action = TransitionToRunning::Dealloc;
123                 } else {
124                     action = TransitionToRunning::Failed;
125                 }
126             } else {
127                 // We are able to lock the RUNNING bit.
128                 next.set_running();
129                 next.unset_notified();
130 
131                 if next.is_cancelled() {
132                     action = TransitionToRunning::Cancelled;
133                 } else {
134                     action = TransitionToRunning::Success;
135                 }
136             }
137             (action, Some(next))
138         })
139     }
140 
141     /// Transitions the task from `Running` -> `Idle`.
142     ///
143     /// The transition to `Idle` fails if the task has been flagged to be
144     /// cancelled.
transition_to_idle(&self) -> TransitionToIdle145     pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
146         self.fetch_update_action(|curr| {
147             assert!(curr.is_running());
148 
149             if curr.is_cancelled() {
150                 return (TransitionToIdle::Cancelled, None);
151             }
152 
153             let mut next = curr;
154             let action;
155             next.unset_running();
156 
157             if !next.is_notified() {
158                 // Polling the future consumes the ref-count of the Notified.
159                 next.ref_dec();
160                 if next.ref_count() == 0 {
161                     action = TransitionToIdle::OkDealloc;
162                 } else {
163                     action = TransitionToIdle::Ok;
164                 }
165             } else {
166                 // The caller will schedule a new notification, so we create a
167                 // new ref-count for the notification. Our own ref-count is kept
168                 // for now, and the caller will drop it shortly.
169                 next.ref_inc();
170                 action = TransitionToIdle::OkNotified;
171             }
172 
173             (action, Some(next))
174         })
175     }
176 
177     /// Transitions the task from `Running` -> `Complete`.
transition_to_complete(&self) -> Snapshot178     pub(super) fn transition_to_complete(&self) -> Snapshot {
179         const DELTA: usize = RUNNING | COMPLETE;
180 
181         let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
182         assert!(prev.is_running());
183         assert!(!prev.is_complete());
184 
185         Snapshot(prev.0 ^ DELTA)
186     }
187 
188     /// Transitions from `Complete` -> `Terminal`, decrementing the reference
189     /// count the specified number of times.
190     ///
191     /// Returns true if the task should be deallocated.
transition_to_terminal(&self, count: usize) -> bool192     pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
193         let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
194         assert!(
195             prev.ref_count() >= count,
196             "current: {}, sub: {}",
197             prev.ref_count(),
198             count
199         );
200         prev.ref_count() == count
201     }
202 
203     /// Transitions the state to `NOTIFIED`.
204     ///
205     /// If no task needs to be submitted, a ref-count is consumed.
206     ///
207     /// If a task needs to be submitted, the ref-count is incremented for the
208     /// new Notified.
transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal209     pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal {
210         self.fetch_update_action(|mut snapshot| {
211             let action;
212 
213             if snapshot.is_running() {
214                 // If the task is running, we mark it as notified, but we should
215                 // not submit anything as the thread currently running the
216                 // future is responsible for that.
217                 snapshot.set_notified();
218                 snapshot.ref_dec();
219 
220                 // The thread that set the running bit also holds a ref-count.
221                 assert!(snapshot.ref_count() > 0);
222 
223                 action = TransitionToNotifiedByVal::DoNothing;
224             } else if snapshot.is_complete() || snapshot.is_notified() {
225                 // We do not need to submit any notifications, but we have to
226                 // decrement the ref-count.
227                 snapshot.ref_dec();
228 
229                 if snapshot.ref_count() == 0 {
230                     action = TransitionToNotifiedByVal::Dealloc;
231                 } else {
232                     action = TransitionToNotifiedByVal::DoNothing;
233                 }
234             } else {
235                 // We create a new notified that we can submit. The caller
236                 // retains ownership of the ref-count they passed in.
237                 snapshot.set_notified();
238                 snapshot.ref_inc();
239                 action = TransitionToNotifiedByVal::Submit;
240             }
241 
242             (action, Some(snapshot))
243         })
244     }
245 
246     /// Transitions the state to `NOTIFIED`.
transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef247     pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef {
248         self.fetch_update_action(|mut snapshot| {
249             if snapshot.is_complete() || snapshot.is_notified() {
250                 // There is nothing to do in this case.
251                 (TransitionToNotifiedByRef::DoNothing, None)
252             } else if snapshot.is_running() {
253                 // If the task is running, we mark it as notified, but we should
254                 // not submit as the thread currently running the future is
255                 // responsible for that.
256                 snapshot.set_notified();
257                 (TransitionToNotifiedByRef::DoNothing, Some(snapshot))
258             } else {
259                 // The task is idle and not notified. We should submit a
260                 // notification.
261                 snapshot.set_notified();
262                 snapshot.ref_inc();
263                 (TransitionToNotifiedByRef::Submit, Some(snapshot))
264             }
265         })
266     }
267 
268     /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref
269     /// count.
270     ///
271     /// Returns `true` if the notified bit was transitioned from `0` to `1`;
272     /// otherwise `false.`
273     #[cfg(all(
274         tokio_unstable,
275         tokio_taskdump,
276         feature = "rt",
277         target_os = "linux",
278         any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
279     ))]
transition_to_notified_for_tracing(&self) -> bool280     pub(super) fn transition_to_notified_for_tracing(&self) -> bool {
281         self.fetch_update_action(|mut snapshot| {
282             if snapshot.is_notified() {
283                 (false, None)
284             } else {
285                 snapshot.set_notified();
286                 snapshot.ref_inc();
287                 (true, Some(snapshot))
288             }
289         })
290     }
291 
292     /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
293     ///
294     /// Returns `true` if the task needs to be submitted to the pool for
295     /// execution.
transition_to_notified_and_cancel(&self) -> bool296     pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
297         self.fetch_update_action(|mut snapshot| {
298             if snapshot.is_cancelled() || snapshot.is_complete() {
299                 // Aborts to completed or cancelled tasks are no-ops.
300                 (false, None)
301             } else if snapshot.is_running() {
302                 // If the task is running, we mark it as cancelled. The thread
303                 // running the task will notice the cancelled bit when it
304                 // stops polling and it will kill the task.
305                 //
306                 // The set_notified() call is not strictly necessary but it will
307                 // in some cases let a wake_by_ref call return without having
308                 // to perform a compare_exchange.
309                 snapshot.set_notified();
310                 snapshot.set_cancelled();
311                 (false, Some(snapshot))
312             } else {
313                 // The task is idle. We set the cancelled and notified bits and
314                 // submit a notification if the notified bit was not already
315                 // set.
316                 snapshot.set_cancelled();
317                 if !snapshot.is_notified() {
318                     snapshot.set_notified();
319                     snapshot.ref_inc();
320                     (true, Some(snapshot))
321                 } else {
322                     (false, Some(snapshot))
323                 }
324             }
325         })
326     }
327 
328     /// Sets the `CANCELLED` bit and attempts to transition to `Running`.
329     ///
330     /// Returns `true` if the transition to `Running` succeeded.
transition_to_shutdown(&self) -> bool331     pub(super) fn transition_to_shutdown(&self) -> bool {
332         let mut prev = Snapshot(0);
333 
334         let _ = self.fetch_update(|mut snapshot| {
335             prev = snapshot;
336 
337             if snapshot.is_idle() {
338                 snapshot.set_running();
339             }
340 
341             // If the task was not idle, the thread currently running the task
342             // will notice the cancelled bit and cancel it once the poll
343             // completes.
344             snapshot.set_cancelled();
345             Some(snapshot)
346         });
347 
348         prev.is_idle()
349     }
350 
351     /// Optimistically tries to swap the state assuming the join handle is
352     /// __immediately__ dropped on spawn.
drop_join_handle_fast(&self) -> Result<(), ()>353     pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
354         use std::sync::atomic::Ordering::Relaxed;
355 
356         // Relaxed is acceptable as if this function is called and succeeds,
357         // then nothing has been done w/ the join handle.
358         //
359         // The moment the join handle is used (polled), the `JOIN_WAKER` flag is
360         // set, at which point the CAS will fail.
361         //
362         // Given this, there is no risk if this operation is reordered.
363         self.val
364             .compare_exchange_weak(
365                 INITIAL_STATE,
366                 (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
367                 Release,
368                 Relaxed,
369             )
370             .map(|_| ())
371             .map_err(|_| ())
372     }
373 
374     /// Tries to unset the `JOIN_INTEREST` flag.
375     ///
376     /// Returns `Ok` if the operation happens before the task transitions to a
377     /// completed state, `Err` otherwise.
unset_join_interested(&self) -> UpdateResult378     pub(super) fn unset_join_interested(&self) -> UpdateResult {
379         self.fetch_update(|curr| {
380             assert!(curr.is_join_interested());
381 
382             if curr.is_complete() {
383                 return None;
384             }
385 
386             let mut next = curr;
387             next.unset_join_interested();
388 
389             Some(next)
390         })
391     }
392 
393     /// Sets the `JOIN_WAKER` bit.
394     ///
395     /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
396     /// the task has completed.
set_join_waker(&self) -> UpdateResult397     pub(super) fn set_join_waker(&self) -> UpdateResult {
398         self.fetch_update(|curr| {
399             assert!(curr.is_join_interested());
400             assert!(!curr.is_join_waker_set());
401 
402             if curr.is_complete() {
403                 return None;
404             }
405 
406             let mut next = curr;
407             next.set_join_waker();
408 
409             Some(next)
410         })
411     }
412 
413     /// Unsets the `JOIN_WAKER` bit.
414     ///
415     /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if
416     /// the task has completed.
unset_waker(&self) -> UpdateResult417     pub(super) fn unset_waker(&self) -> UpdateResult {
418         self.fetch_update(|curr| {
419             assert!(curr.is_join_interested());
420             assert!(curr.is_join_waker_set());
421 
422             if curr.is_complete() {
423                 return None;
424             }
425 
426             let mut next = curr;
427             next.unset_join_waker();
428 
429             Some(next)
430         })
431     }
432 
ref_inc(&self)433     pub(super) fn ref_inc(&self) {
434         use std::process;
435         use std::sync::atomic::Ordering::Relaxed;
436 
437         // Using a relaxed ordering is alright here, as knowledge of the
438         // original reference prevents other threads from erroneously deleting
439         // the object.
440         //
441         // As explained in the [Boost documentation][1], Increasing the
442         // reference counter can always be done with memory_order_relaxed: New
443         // references to an object can only be formed from an existing
444         // reference, and passing an existing reference from one thread to
445         // another must already provide any required synchronization.
446         //
447         // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
448         let prev = self.val.fetch_add(REF_ONE, Relaxed);
449 
450         // If the reference count overflowed, abort.
451         if prev > isize::MAX as usize {
452             process::abort();
453         }
454     }
455 
456     /// Returns `true` if the task should be released.
ref_dec(&self) -> bool457     pub(super) fn ref_dec(&self) -> bool {
458         let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
459         assert!(prev.ref_count() >= 1);
460         prev.ref_count() == 1
461     }
462 
463     /// Returns `true` if the task should be released.
ref_dec_twice(&self) -> bool464     pub(super) fn ref_dec_twice(&self) -> bool {
465         let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel));
466         assert!(prev.ref_count() >= 2);
467         prev.ref_count() == 2
468     }
469 
fetch_update_action<F, T>(&self, mut f: F) -> T where F: FnMut(Snapshot) -> (T, Option<Snapshot>),470     fn fetch_update_action<F, T>(&self, mut f: F) -> T
471     where
472         F: FnMut(Snapshot) -> (T, Option<Snapshot>),
473     {
474         let mut curr = self.load();
475 
476         loop {
477             let (output, next) = f(curr);
478             let next = match next {
479                 Some(next) => next,
480                 None => return output,
481             };
482 
483             let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
484 
485             match res {
486                 Ok(_) => return output,
487                 Err(actual) => curr = Snapshot(actual),
488             }
489         }
490     }
491 
fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> where F: FnMut(Snapshot) -> Option<Snapshot>,492     fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
493     where
494         F: FnMut(Snapshot) -> Option<Snapshot>,
495     {
496         let mut curr = self.load();
497 
498         loop {
499             let next = match f(curr) {
500                 Some(next) => next,
501                 None => return Err(curr),
502             };
503 
504             let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
505 
506             match res {
507                 Ok(_) => return Ok(next),
508                 Err(actual) => curr = Snapshot(actual),
509             }
510         }
511     }
512 }
513 
514 // ===== impl Snapshot =====
515 
516 impl Snapshot {
517     /// Returns `true` if the task is in an idle state.
is_idle(self) -> bool518     pub(super) fn is_idle(self) -> bool {
519         self.0 & (RUNNING | COMPLETE) == 0
520     }
521 
522     /// Returns `true` if the task has been flagged as notified.
is_notified(self) -> bool523     pub(super) fn is_notified(self) -> bool {
524         self.0 & NOTIFIED == NOTIFIED
525     }
526 
unset_notified(&mut self)527     fn unset_notified(&mut self) {
528         self.0 &= !NOTIFIED;
529     }
530 
set_notified(&mut self)531     fn set_notified(&mut self) {
532         self.0 |= NOTIFIED;
533     }
534 
is_running(self) -> bool535     pub(super) fn is_running(self) -> bool {
536         self.0 & RUNNING == RUNNING
537     }
538 
set_running(&mut self)539     fn set_running(&mut self) {
540         self.0 |= RUNNING;
541     }
542 
unset_running(&mut self)543     fn unset_running(&mut self) {
544         self.0 &= !RUNNING;
545     }
546 
is_cancelled(self) -> bool547     pub(super) fn is_cancelled(self) -> bool {
548         self.0 & CANCELLED == CANCELLED
549     }
550 
set_cancelled(&mut self)551     fn set_cancelled(&mut self) {
552         self.0 |= CANCELLED;
553     }
554 
555     /// Returns `true` if the task's future has completed execution.
is_complete(self) -> bool556     pub(super) fn is_complete(self) -> bool {
557         self.0 & COMPLETE == COMPLETE
558     }
559 
is_join_interested(self) -> bool560     pub(super) fn is_join_interested(self) -> bool {
561         self.0 & JOIN_INTEREST == JOIN_INTEREST
562     }
563 
unset_join_interested(&mut self)564     fn unset_join_interested(&mut self) {
565         self.0 &= !JOIN_INTEREST;
566     }
567 
is_join_waker_set(self) -> bool568     pub(super) fn is_join_waker_set(self) -> bool {
569         self.0 & JOIN_WAKER == JOIN_WAKER
570     }
571 
set_join_waker(&mut self)572     fn set_join_waker(&mut self) {
573         self.0 |= JOIN_WAKER;
574     }
575 
unset_join_waker(&mut self)576     fn unset_join_waker(&mut self) {
577         self.0 &= !JOIN_WAKER;
578     }
579 
ref_count(self) -> usize580     pub(super) fn ref_count(self) -> usize {
581         (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
582     }
583 
ref_inc(&mut self)584     fn ref_inc(&mut self) {
585         assert!(self.0 <= isize::MAX as usize);
586         self.0 += REF_ONE;
587     }
588 
ref_dec(&mut self)589     pub(super) fn ref_dec(&mut self) {
590         assert!(self.ref_count() > 0);
591         self.0 -= REF_ONE;
592     }
593 }
594 
595 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result596     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
597         let snapshot = self.load();
598         snapshot.fmt(fmt)
599     }
600 }
601 
602 impl fmt::Debug for Snapshot {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result603     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
604         fmt.debug_struct("Snapshot")
605             .field("is_running", &self.is_running())
606             .field("is_complete", &self.is_complete())
607             .field("is_notified", &self.is_notified())
608             .field("is_cancelled", &self.is_cancelled())
609             .field("is_join_interested", &self.is_join_interested())
610             .field("is_join_waker_set", &self.is_join_waker_set())
611             .field("ref_count", &self.ref_count())
612             .finish()
613     }
614 }
615