1 use crate::loom::sync::atomic::AtomicUsize; 2 3 use std::fmt; 4 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; 5 6 pub(super) struct State { 7 val: AtomicUsize, 8 } 9 10 /// Current state value. 11 #[derive(Copy, Clone)] 12 pub(super) struct Snapshot(usize); 13 14 type UpdateResult = Result<Snapshot, Snapshot>; 15 16 /// The task is currently being run. 17 const RUNNING: usize = 0b0001; 18 19 /// The task is complete. 20 /// 21 /// Once this bit is set, it is never unset. 22 const COMPLETE: usize = 0b0010; 23 24 /// Extracts the task's lifecycle value from the state. 25 const LIFECYCLE_MASK: usize = 0b11; 26 27 /// Flag tracking if the task has been pushed into a run queue. 28 const NOTIFIED: usize = 0b100; 29 30 /// The join handle is still around. 31 const JOIN_INTEREST: usize = 0b1_000; 32 33 /// A join handle waker has been set. 34 const JOIN_WAKER: usize = 0b10_000; 35 36 /// The task has been forcibly cancelled. 37 const CANCELLED: usize = 0b100_000; 38 39 /// All bits. 40 const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED; 41 42 /// Bits used by the ref count portion of the state. 43 const REF_COUNT_MASK: usize = !STATE_MASK; 44 45 /// Number of positions to shift the ref count. 46 const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize; 47 48 /// One ref count. 49 const REF_ONE: usize = 1 << REF_COUNT_SHIFT; 50 51 /// State a task is initialized with. 52 /// 53 /// A task is initialized with three references: 54 /// 55 /// * A reference that will be stored in an `OwnedTasks` or `LocalOwnedTasks`. 56 /// * A reference that will be sent to the scheduler as an ordinary notification. 57 /// * A reference for the `JoinHandle`. 58 /// 59 /// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set. 60 /// As the task starts with a `Notified`, `NOTIFIED` is set. 61 const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED; 62 63 #[must_use] 64 pub(super) enum TransitionToRunning { 65 Success, 66 Cancelled, 67 Failed, 68 Dealloc, 69 } 70 71 #[must_use] 72 pub(super) enum TransitionToIdle { 73 Ok, 74 OkNotified, 75 OkDealloc, 76 Cancelled, 77 } 78 79 #[must_use] 80 pub(super) enum TransitionToNotifiedByVal { 81 DoNothing, 82 Submit, 83 Dealloc, 84 } 85 86 #[must_use] 87 pub(crate) enum TransitionToNotifiedByRef { 88 DoNothing, 89 Submit, 90 } 91 92 /// All transitions are performed via RMW operations. This establishes an 93 /// unambiguous modification order. 94 impl State { 95 /// Returns a task's initial state. new() -> State96 pub(super) fn new() -> State { 97 // The raw task returned by this method has a ref-count of three. See 98 // the comment on INITIAL_STATE for more. 99 State { 100 val: AtomicUsize::new(INITIAL_STATE), 101 } 102 } 103 104 /// Loads the current state, establishes `Acquire` ordering. load(&self) -> Snapshot105 pub(super) fn load(&self) -> Snapshot { 106 Snapshot(self.val.load(Acquire)) 107 } 108 109 /// Attempts to transition the lifecycle to `Running`. This sets the 110 /// notified bit to false so notifications during the poll can be detected. transition_to_running(&self) -> TransitionToRunning111 pub(super) fn transition_to_running(&self) -> TransitionToRunning { 112 self.fetch_update_action(|mut next| { 113 let action; 114 assert!(next.is_notified()); 115 116 if !next.is_idle() { 117 // This happens if the task is either currently running or if it 118 // has already completed, e.g. if it was cancelled during 119 // shutdown. Consume the ref-count and return. 120 next.ref_dec(); 121 if next.ref_count() == 0 { 122 action = TransitionToRunning::Dealloc; 123 } else { 124 action = TransitionToRunning::Failed; 125 } 126 } else { 127 // We are able to lock the RUNNING bit. 128 next.set_running(); 129 next.unset_notified(); 130 131 if next.is_cancelled() { 132 action = TransitionToRunning::Cancelled; 133 } else { 134 action = TransitionToRunning::Success; 135 } 136 } 137 (action, Some(next)) 138 }) 139 } 140 141 /// Transitions the task from `Running` -> `Idle`. 142 /// 143 /// The transition to `Idle` fails if the task has been flagged to be 144 /// cancelled. transition_to_idle(&self) -> TransitionToIdle145 pub(super) fn transition_to_idle(&self) -> TransitionToIdle { 146 self.fetch_update_action(|curr| { 147 assert!(curr.is_running()); 148 149 if curr.is_cancelled() { 150 return (TransitionToIdle::Cancelled, None); 151 } 152 153 let mut next = curr; 154 let action; 155 next.unset_running(); 156 157 if !next.is_notified() { 158 // Polling the future consumes the ref-count of the Notified. 159 next.ref_dec(); 160 if next.ref_count() == 0 { 161 action = TransitionToIdle::OkDealloc; 162 } else { 163 action = TransitionToIdle::Ok; 164 } 165 } else { 166 // The caller will schedule a new notification, so we create a 167 // new ref-count for the notification. Our own ref-count is kept 168 // for now, and the caller will drop it shortly. 169 next.ref_inc(); 170 action = TransitionToIdle::OkNotified; 171 } 172 173 (action, Some(next)) 174 }) 175 } 176 177 /// Transitions the task from `Running` -> `Complete`. transition_to_complete(&self) -> Snapshot178 pub(super) fn transition_to_complete(&self) -> Snapshot { 179 const DELTA: usize = RUNNING | COMPLETE; 180 181 let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel)); 182 assert!(prev.is_running()); 183 assert!(!prev.is_complete()); 184 185 Snapshot(prev.0 ^ DELTA) 186 } 187 188 /// Transitions from `Complete` -> `Terminal`, decrementing the reference 189 /// count the specified number of times. 190 /// 191 /// Returns true if the task should be deallocated. transition_to_terminal(&self, count: usize) -> bool192 pub(super) fn transition_to_terminal(&self, count: usize) -> bool { 193 let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel)); 194 assert!( 195 prev.ref_count() >= count, 196 "current: {}, sub: {}", 197 prev.ref_count(), 198 count 199 ); 200 prev.ref_count() == count 201 } 202 203 /// Transitions the state to `NOTIFIED`. 204 /// 205 /// If no task needs to be submitted, a ref-count is consumed. 206 /// 207 /// If a task needs to be submitted, the ref-count is incremented for the 208 /// new Notified. transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal209 pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal { 210 self.fetch_update_action(|mut snapshot| { 211 let action; 212 213 if snapshot.is_running() { 214 // If the task is running, we mark it as notified, but we should 215 // not submit anything as the thread currently running the 216 // future is responsible for that. 217 snapshot.set_notified(); 218 snapshot.ref_dec(); 219 220 // The thread that set the running bit also holds a ref-count. 221 assert!(snapshot.ref_count() > 0); 222 223 action = TransitionToNotifiedByVal::DoNothing; 224 } else if snapshot.is_complete() || snapshot.is_notified() { 225 // We do not need to submit any notifications, but we have to 226 // decrement the ref-count. 227 snapshot.ref_dec(); 228 229 if snapshot.ref_count() == 0 { 230 action = TransitionToNotifiedByVal::Dealloc; 231 } else { 232 action = TransitionToNotifiedByVal::DoNothing; 233 } 234 } else { 235 // We create a new notified that we can submit. The caller 236 // retains ownership of the ref-count they passed in. 237 snapshot.set_notified(); 238 snapshot.ref_inc(); 239 action = TransitionToNotifiedByVal::Submit; 240 } 241 242 (action, Some(snapshot)) 243 }) 244 } 245 246 /// Transitions the state to `NOTIFIED`. transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef247 pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef { 248 self.fetch_update_action(|mut snapshot| { 249 if snapshot.is_complete() || snapshot.is_notified() { 250 // There is nothing to do in this case. 251 (TransitionToNotifiedByRef::DoNothing, None) 252 } else if snapshot.is_running() { 253 // If the task is running, we mark it as notified, but we should 254 // not submit as the thread currently running the future is 255 // responsible for that. 256 snapshot.set_notified(); 257 (TransitionToNotifiedByRef::DoNothing, Some(snapshot)) 258 } else { 259 // The task is idle and not notified. We should submit a 260 // notification. 261 snapshot.set_notified(); 262 snapshot.ref_inc(); 263 (TransitionToNotifiedByRef::Submit, Some(snapshot)) 264 } 265 }) 266 } 267 268 /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref 269 /// count. 270 /// 271 /// Returns `true` if the notified bit was transitioned from `0` to `1`; 272 /// otherwise `false.` 273 #[cfg(all( 274 tokio_unstable, 275 tokio_taskdump, 276 feature = "rt", 277 target_os = "linux", 278 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") 279 ))] transition_to_notified_for_tracing(&self) -> bool280 pub(super) fn transition_to_notified_for_tracing(&self) -> bool { 281 self.fetch_update_action(|mut snapshot| { 282 if snapshot.is_notified() { 283 (false, None) 284 } else { 285 snapshot.set_notified(); 286 snapshot.ref_inc(); 287 (true, Some(snapshot)) 288 } 289 }) 290 } 291 292 /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle. 293 /// 294 /// Returns `true` if the task needs to be submitted to the pool for 295 /// execution. transition_to_notified_and_cancel(&self) -> bool296 pub(super) fn transition_to_notified_and_cancel(&self) -> bool { 297 self.fetch_update_action(|mut snapshot| { 298 if snapshot.is_cancelled() || snapshot.is_complete() { 299 // Aborts to completed or cancelled tasks are no-ops. 300 (false, None) 301 } else if snapshot.is_running() { 302 // If the task is running, we mark it as cancelled. The thread 303 // running the task will notice the cancelled bit when it 304 // stops polling and it will kill the task. 305 // 306 // The set_notified() call is not strictly necessary but it will 307 // in some cases let a wake_by_ref call return without having 308 // to perform a compare_exchange. 309 snapshot.set_notified(); 310 snapshot.set_cancelled(); 311 (false, Some(snapshot)) 312 } else { 313 // The task is idle. We set the cancelled and notified bits and 314 // submit a notification if the notified bit was not already 315 // set. 316 snapshot.set_cancelled(); 317 if !snapshot.is_notified() { 318 snapshot.set_notified(); 319 snapshot.ref_inc(); 320 (true, Some(snapshot)) 321 } else { 322 (false, Some(snapshot)) 323 } 324 } 325 }) 326 } 327 328 /// Sets the `CANCELLED` bit and attempts to transition to `Running`. 329 /// 330 /// Returns `true` if the transition to `Running` succeeded. transition_to_shutdown(&self) -> bool331 pub(super) fn transition_to_shutdown(&self) -> bool { 332 let mut prev = Snapshot(0); 333 334 let _ = self.fetch_update(|mut snapshot| { 335 prev = snapshot; 336 337 if snapshot.is_idle() { 338 snapshot.set_running(); 339 } 340 341 // If the task was not idle, the thread currently running the task 342 // will notice the cancelled bit and cancel it once the poll 343 // completes. 344 snapshot.set_cancelled(); 345 Some(snapshot) 346 }); 347 348 prev.is_idle() 349 } 350 351 /// Optimistically tries to swap the state assuming the join handle is 352 /// __immediately__ dropped on spawn. drop_join_handle_fast(&self) -> Result<(), ()>353 pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> { 354 use std::sync::atomic::Ordering::Relaxed; 355 356 // Relaxed is acceptable as if this function is called and succeeds, 357 // then nothing has been done w/ the join handle. 358 // 359 // The moment the join handle is used (polled), the `JOIN_WAKER` flag is 360 // set, at which point the CAS will fail. 361 // 362 // Given this, there is no risk if this operation is reordered. 363 self.val 364 .compare_exchange_weak( 365 INITIAL_STATE, 366 (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST, 367 Release, 368 Relaxed, 369 ) 370 .map(|_| ()) 371 .map_err(|_| ()) 372 } 373 374 /// Tries to unset the `JOIN_INTEREST` flag. 375 /// 376 /// Returns `Ok` if the operation happens before the task transitions to a 377 /// completed state, `Err` otherwise. unset_join_interested(&self) -> UpdateResult378 pub(super) fn unset_join_interested(&self) -> UpdateResult { 379 self.fetch_update(|curr| { 380 assert!(curr.is_join_interested()); 381 382 if curr.is_complete() { 383 return None; 384 } 385 386 let mut next = curr; 387 next.unset_join_interested(); 388 389 Some(next) 390 }) 391 } 392 393 /// Sets the `JOIN_WAKER` bit. 394 /// 395 /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if 396 /// the task has completed. set_join_waker(&self) -> UpdateResult397 pub(super) fn set_join_waker(&self) -> UpdateResult { 398 self.fetch_update(|curr| { 399 assert!(curr.is_join_interested()); 400 assert!(!curr.is_join_waker_set()); 401 402 if curr.is_complete() { 403 return None; 404 } 405 406 let mut next = curr; 407 next.set_join_waker(); 408 409 Some(next) 410 }) 411 } 412 413 /// Unsets the `JOIN_WAKER` bit. 414 /// 415 /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if 416 /// the task has completed. unset_waker(&self) -> UpdateResult417 pub(super) fn unset_waker(&self) -> UpdateResult { 418 self.fetch_update(|curr| { 419 assert!(curr.is_join_interested()); 420 assert!(curr.is_join_waker_set()); 421 422 if curr.is_complete() { 423 return None; 424 } 425 426 let mut next = curr; 427 next.unset_join_waker(); 428 429 Some(next) 430 }) 431 } 432 ref_inc(&self)433 pub(super) fn ref_inc(&self) { 434 use std::process; 435 use std::sync::atomic::Ordering::Relaxed; 436 437 // Using a relaxed ordering is alright here, as knowledge of the 438 // original reference prevents other threads from erroneously deleting 439 // the object. 440 // 441 // As explained in the [Boost documentation][1], Increasing the 442 // reference counter can always be done with memory_order_relaxed: New 443 // references to an object can only be formed from an existing 444 // reference, and passing an existing reference from one thread to 445 // another must already provide any required synchronization. 446 // 447 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) 448 let prev = self.val.fetch_add(REF_ONE, Relaxed); 449 450 // If the reference count overflowed, abort. 451 if prev > isize::MAX as usize { 452 process::abort(); 453 } 454 } 455 456 /// Returns `true` if the task should be released. ref_dec(&self) -> bool457 pub(super) fn ref_dec(&self) -> bool { 458 let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel)); 459 assert!(prev.ref_count() >= 1); 460 prev.ref_count() == 1 461 } 462 463 /// Returns `true` if the task should be released. ref_dec_twice(&self) -> bool464 pub(super) fn ref_dec_twice(&self) -> bool { 465 let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel)); 466 assert!(prev.ref_count() >= 2); 467 prev.ref_count() == 2 468 } 469 fetch_update_action<F, T>(&self, mut f: F) -> T where F: FnMut(Snapshot) -> (T, Option<Snapshot>),470 fn fetch_update_action<F, T>(&self, mut f: F) -> T 471 where 472 F: FnMut(Snapshot) -> (T, Option<Snapshot>), 473 { 474 let mut curr = self.load(); 475 476 loop { 477 let (output, next) = f(curr); 478 let next = match next { 479 Some(next) => next, 480 None => return output, 481 }; 482 483 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); 484 485 match res { 486 Ok(_) => return output, 487 Err(actual) => curr = Snapshot(actual), 488 } 489 } 490 } 491 fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> where F: FnMut(Snapshot) -> Option<Snapshot>,492 fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> 493 where 494 F: FnMut(Snapshot) -> Option<Snapshot>, 495 { 496 let mut curr = self.load(); 497 498 loop { 499 let next = match f(curr) { 500 Some(next) => next, 501 None => return Err(curr), 502 }; 503 504 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); 505 506 match res { 507 Ok(_) => return Ok(next), 508 Err(actual) => curr = Snapshot(actual), 509 } 510 } 511 } 512 } 513 514 // ===== impl Snapshot ===== 515 516 impl Snapshot { 517 /// Returns `true` if the task is in an idle state. is_idle(self) -> bool518 pub(super) fn is_idle(self) -> bool { 519 self.0 & (RUNNING | COMPLETE) == 0 520 } 521 522 /// Returns `true` if the task has been flagged as notified. is_notified(self) -> bool523 pub(super) fn is_notified(self) -> bool { 524 self.0 & NOTIFIED == NOTIFIED 525 } 526 unset_notified(&mut self)527 fn unset_notified(&mut self) { 528 self.0 &= !NOTIFIED; 529 } 530 set_notified(&mut self)531 fn set_notified(&mut self) { 532 self.0 |= NOTIFIED; 533 } 534 is_running(self) -> bool535 pub(super) fn is_running(self) -> bool { 536 self.0 & RUNNING == RUNNING 537 } 538 set_running(&mut self)539 fn set_running(&mut self) { 540 self.0 |= RUNNING; 541 } 542 unset_running(&mut self)543 fn unset_running(&mut self) { 544 self.0 &= !RUNNING; 545 } 546 is_cancelled(self) -> bool547 pub(super) fn is_cancelled(self) -> bool { 548 self.0 & CANCELLED == CANCELLED 549 } 550 set_cancelled(&mut self)551 fn set_cancelled(&mut self) { 552 self.0 |= CANCELLED; 553 } 554 555 /// Returns `true` if the task's future has completed execution. is_complete(self) -> bool556 pub(super) fn is_complete(self) -> bool { 557 self.0 & COMPLETE == COMPLETE 558 } 559 is_join_interested(self) -> bool560 pub(super) fn is_join_interested(self) -> bool { 561 self.0 & JOIN_INTEREST == JOIN_INTEREST 562 } 563 unset_join_interested(&mut self)564 fn unset_join_interested(&mut self) { 565 self.0 &= !JOIN_INTEREST; 566 } 567 is_join_waker_set(self) -> bool568 pub(super) fn is_join_waker_set(self) -> bool { 569 self.0 & JOIN_WAKER == JOIN_WAKER 570 } 571 set_join_waker(&mut self)572 fn set_join_waker(&mut self) { 573 self.0 |= JOIN_WAKER; 574 } 575 unset_join_waker(&mut self)576 fn unset_join_waker(&mut self) { 577 self.0 &= !JOIN_WAKER; 578 } 579 ref_count(self) -> usize580 pub(super) fn ref_count(self) -> usize { 581 (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT 582 } 583 ref_inc(&mut self)584 fn ref_inc(&mut self) { 585 assert!(self.0 <= isize::MAX as usize); 586 self.0 += REF_ONE; 587 } 588 ref_dec(&mut self)589 pub(super) fn ref_dec(&mut self) { 590 assert!(self.ref_count() > 0); 591 self.0 -= REF_ONE; 592 } 593 } 594 595 impl fmt::Debug for State { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result596 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 597 let snapshot = self.load(); 598 snapshot.fmt(fmt) 599 } 600 } 601 602 impl fmt::Debug for Snapshot { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result603 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 604 fmt.debug_struct("Snapshot") 605 .field("is_running", &self.is_running()) 606 .field("is_complete", &self.is_complete()) 607 .field("is_notified", &self.is_notified()) 608 .field("is_cancelled", &self.is_cancelled()) 609 .field("is_join_interested", &self.is_join_interested()) 610 .field("is_join_waker_set", &self.is_join_waker_set()) 611 .field("ref_count", &self.ref_count()) 612 .finish() 613 } 614 } 615