1 //! Timer state structures. 2 //! 3 //! This module contains the heart of the intrusive timer implementation, and as 4 //! such the structures inside are full of tricky concurrency and unsafe code. 5 //! 6 //! # Ground rules 7 //! 8 //! The heart of the timer implementation here is the [`TimerShared`] structure, 9 //! shared between the [`TimerEntry`] and the driver. Generally, we permit access 10 //! to [`TimerShared`] ONLY via either 1) a mutable reference to [`TimerEntry`] or 11 //! 2) a held driver lock. 12 //! 13 //! It follows from this that any changes made while holding BOTH 1 and 2 will 14 //! be reliably visible, regardless of ordering. This is because of the `acq/rel` 15 //! fences on the driver lock ensuring ordering with 2, and rust mutable 16 //! reference rules for 1 (a mutable reference to an object can't be passed 17 //! between threads without an `acq/rel` barrier, and same-thread we have local 18 //! happens-before ordering). 19 //! 20 //! # State field 21 //! 22 //! Each timer has a state field associated with it. This field contains either 23 //! the current scheduled time, or a special flag value indicating its state. 24 //! This state can either indicate that the timer is on the 'pending' queue (and 25 //! thus will be fired with an `Ok(())` result soon) or that it has already been 26 //! fired/deregistered. 27 //! 28 //! This single state field allows for code that is firing the timer to 29 //! synchronize with any racing `reset` calls reliably. 30 //! 31 //! # Cached vs true timeouts 32 //! 33 //! To allow for the use case of a timeout that is periodically reset before 34 //! expiration to be as lightweight as possible, we support optimistically 35 //! lock-free timer resets, in the case where a timer is rescheduled to a later 36 //! point than it was originally scheduled for. 37 //! 38 //! This is accomplished by lazily rescheduling timers. That is, we update the 39 //! state field with the true expiration of the timer from the holder of 40 //! the [`TimerEntry`]. When the driver services timers (ie, whenever it's 41 //! walking lists of timers), it checks this "true when" value, and reschedules 42 //! based on it. 43 //! 44 //! We do, however, also need to track what the expiration time was when we 45 //! originally registered the timer; this is used to locate the right linked 46 //! list when the timer is being cancelled. This is referred to as the "cached 47 //! when" internally. 48 //! 49 //! There is of course a race condition between timer reset and timer 50 //! expiration. If the driver fails to observe the updated expiration time, it 51 //! could trigger expiration of the timer too early. However, because 52 //! [`mark_pending`][mark_pending] performs a compare-and-swap, it will identify this race and 53 //! refuse to mark the timer as pending. 54 //! 55 //! [mark_pending]: TimerHandle::mark_pending 56 57 use crate::loom::cell::UnsafeCell; 58 use crate::loom::sync::atomic::AtomicU64; 59 use crate::loom::sync::atomic::Ordering; 60 61 use crate::runtime::context; 62 use crate::runtime::scheduler; 63 use crate::sync::AtomicWaker; 64 use crate::time::Instant; 65 use crate::util::linked_list; 66 67 use std::cell::UnsafeCell as StdUnsafeCell; 68 use std::task::{Context, Poll, Waker}; 69 use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull}; 70 71 type TimerResult = Result<(), crate::time::error::Error>; 72 73 const STATE_DEREGISTERED: u64 = u64::MAX; 74 const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1; 75 const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; 76 /// The largest safe integer to use for ticks. 77 /// 78 /// This value should be updated if any other signal values are added above. 79 pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = STATE_MIN_VALUE - 1; 80 81 /// This structure holds the current shared state of the timer - its scheduled 82 /// time (if registered), or otherwise the result of the timer completing, as 83 /// well as the registered waker. 84 /// 85 /// Generally, the `StateCell` is only permitted to be accessed from two contexts: 86 /// Either a thread holding the corresponding `&mut TimerEntry`, or a thread 87 /// holding the timer driver lock. The write actions on the `StateCell` amount to 88 /// passing "ownership" of the `StateCell` between these contexts; moving a timer 89 /// from the `TimerEntry` to the driver requires _both_ holding the `&mut 90 /// TimerEntry` and the driver lock, while moving it back (firing the timer) 91 /// requires only the driver lock. 92 pub(super) struct StateCell { 93 /// Holds either the scheduled expiration time for this timer, or (if the 94 /// timer has been fired and is unregistered), `u64::MAX`. 95 state: AtomicU64, 96 /// If the timer is fired (an Acquire order read on state shows 97 /// `u64::MAX`), holds the result that should be returned from 98 /// polling the timer. Otherwise, the contents are unspecified and reading 99 /// without holding the driver lock is undefined behavior. 100 result: UnsafeCell<TimerResult>, 101 /// The currently-registered waker 102 waker: AtomicWaker, 103 } 104 105 impl Default for StateCell { default() -> Self106 fn default() -> Self { 107 Self::new() 108 } 109 } 110 111 impl std::fmt::Debug for StateCell { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 113 write!(f, "StateCell({:?})", self.read_state()) 114 } 115 } 116 117 impl StateCell { new() -> Self118 fn new() -> Self { 119 Self { 120 state: AtomicU64::new(STATE_DEREGISTERED), 121 result: UnsafeCell::new(Ok(())), 122 waker: AtomicWaker::new(), 123 } 124 } 125 is_pending(&self) -> bool126 fn is_pending(&self) -> bool { 127 self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE 128 } 129 130 /// Returns the current expiration time, or None if not currently scheduled. when(&self) -> Option<u64>131 fn when(&self) -> Option<u64> { 132 let cur_state = self.state.load(Ordering::Relaxed); 133 134 if cur_state == STATE_DEREGISTERED { 135 None 136 } else { 137 Some(cur_state) 138 } 139 } 140 141 /// If the timer is completed, returns the result of the timer. Otherwise, 142 /// returns None and registers the waker. poll(&self, waker: &Waker) -> Poll<TimerResult>143 fn poll(&self, waker: &Waker) -> Poll<TimerResult> { 144 // We must register first. This ensures that either `fire` will 145 // observe the new waker, or we will observe a racing fire to have set 146 // the state, or both. 147 self.waker.register_by_ref(waker); 148 149 self.read_state() 150 } 151 read_state(&self) -> Poll<TimerResult>152 fn read_state(&self) -> Poll<TimerResult> { 153 let cur_state = self.state.load(Ordering::Acquire); 154 155 if cur_state == STATE_DEREGISTERED { 156 // SAFETY: The driver has fired this timer; this involves writing 157 // the result, and then writing (with release ordering) the state 158 // field. 159 Poll::Ready(unsafe { self.result.with(|p| *p) }) 160 } else { 161 Poll::Pending 162 } 163 } 164 165 /// Marks this timer as being moved to the pending list, if its scheduled 166 /// time is not after `not_after`. 167 /// 168 /// If the timer is scheduled for a time after `not_after`, returns an Err 169 /// containing the current scheduled time. 170 /// 171 /// SAFETY: Must hold the driver lock. mark_pending(&self, not_after: u64) -> Result<(), u64>172 unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { 173 // Quick initial debug check to see if the timer is already fired. Since 174 // firing the timer can only happen with the driver lock held, we know 175 // we shouldn't be able to "miss" a transition to a fired state, even 176 // with relaxed ordering. 177 let mut cur_state = self.state.load(Ordering::Relaxed); 178 179 loop { 180 // improve the error message for things like 181 // https://github.com/tokio-rs/tokio/issues/3675 182 assert!( 183 cur_state < STATE_MIN_VALUE, 184 "mark_pending called when the timer entry is in an invalid state" 185 ); 186 187 if cur_state > not_after { 188 break Err(cur_state); 189 } 190 191 match self.state.compare_exchange_weak( 192 cur_state, 193 STATE_PENDING_FIRE, 194 Ordering::AcqRel, 195 Ordering::Acquire, 196 ) { 197 Ok(_) => break Ok(()), 198 Err(actual_state) => cur_state = actual_state, 199 } 200 } 201 } 202 203 /// Fires the timer, setting the result to the provided result. 204 /// 205 /// Returns: 206 /// * `Some(waker)` - if fired and a waker needs to be invoked once the 207 /// driver lock is released 208 /// * `None` - if fired and a waker does not need to be invoked, or if 209 /// already fired 210 /// 211 /// SAFETY: The driver lock must be held. fire(&self, result: TimerResult) -> Option<Waker>212 unsafe fn fire(&self, result: TimerResult) -> Option<Waker> { 213 // Quick initial check to see if the timer is already fired. Since 214 // firing the timer can only happen with the driver lock held, we know 215 // we shouldn't be able to "miss" a transition to a fired state, even 216 // with relaxed ordering. 217 let cur_state = self.state.load(Ordering::Relaxed); 218 if cur_state == STATE_DEREGISTERED { 219 return None; 220 } 221 222 // SAFETY: We assume the driver lock is held and the timer is not 223 // fired, so only the driver is accessing this field. 224 // 225 // We perform a release-ordered store to state below, to ensure this 226 // write is visible before the state update is visible. 227 unsafe { self.result.with_mut(|p| *p = result) }; 228 229 self.state.store(STATE_DEREGISTERED, Ordering::Release); 230 231 self.waker.take_waker() 232 } 233 234 /// Marks the timer as registered (poll will return None) and sets the 235 /// expiration time. 236 /// 237 /// While this function is memory-safe, it should only be called from a 238 /// context holding both `&mut TimerEntry` and the driver lock. set_expiration(&self, timestamp: u64)239 fn set_expiration(&self, timestamp: u64) { 240 debug_assert!(timestamp < STATE_MIN_VALUE); 241 242 // We can use relaxed ordering because we hold the driver lock and will 243 // fence when we release the lock. 244 self.state.store(timestamp, Ordering::Relaxed); 245 } 246 247 /// Attempts to adjust the timer to a new timestamp. 248 /// 249 /// If the timer has already been fired, is pending firing, or the new 250 /// timestamp is earlier than the old timestamp, (or occasionally 251 /// spuriously) returns Err without changing the timer's state. In this 252 /// case, the timer must be deregistered and re-registered. extend_expiration(&self, new_timestamp: u64) -> Result<(), ()>253 fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> { 254 let mut prior = self.state.load(Ordering::Relaxed); 255 loop { 256 if new_timestamp < prior || prior >= STATE_MIN_VALUE { 257 return Err(()); 258 } 259 260 match self.state.compare_exchange_weak( 261 prior, 262 new_timestamp, 263 Ordering::AcqRel, 264 Ordering::Acquire, 265 ) { 266 Ok(_) => return Ok(()), 267 Err(true_prior) => prior = true_prior, 268 } 269 } 270 } 271 272 /// Returns true if the state of this timer indicates that the timer might 273 /// be registered with the driver. This check is performed with relaxed 274 /// ordering, but is conservative - if it returns false, the timer is 275 /// definitely _not_ registered. might_be_registered(&self) -> bool276 pub(super) fn might_be_registered(&self) -> bool { 277 self.state.load(Ordering::Relaxed) != u64::MAX 278 } 279 } 280 281 /// A timer entry. 282 /// 283 /// This is the handle to a timer that is controlled by the requester of the 284 /// timer. As this participates in intrusive data structures, it must be pinned 285 /// before polling. 286 #[derive(Debug)] 287 pub(crate) struct TimerEntry { 288 /// Arc reference to the runtime handle. We can only free the driver after 289 /// deregistering everything from their respective timer wheels. 290 driver: scheduler::Handle, 291 /// Shared inner structure; this is part of an intrusive linked list, and 292 /// therefore other references can exist to it while mutable references to 293 /// Entry exist. 294 /// 295 /// This is manipulated only under the inner mutex. TODO: Can we use loom 296 /// cells for this? 297 inner: StdUnsafeCell<Option<TimerShared>>, 298 /// Deadline for the timer. This is used to register on the first 299 /// poll, as we can't register prior to being pinned. 300 deadline: Instant, 301 /// Whether the deadline has been registered. 302 registered: bool, 303 /// Ensure the type is !Unpin 304 _m: std::marker::PhantomPinned, 305 } 306 307 unsafe impl Send for TimerEntry {} 308 unsafe impl Sync for TimerEntry {} 309 310 /// An `TimerHandle` is the (non-enforced) "unique" pointer from the driver to the 311 /// timer entry. Generally, at most one `TimerHandle` exists for a timer at a time 312 /// (enforced by the timer state machine). 313 /// 314 /// SAFETY: An `TimerHandle` is essentially a raw pointer, and the usual caveats 315 /// of pointer safety apply. In particular, `TimerHandle` does not itself enforce 316 /// that the timer does still exist; however, normally an `TimerHandle` is created 317 /// immediately before registering the timer, and is consumed when firing the 318 /// timer, to help minimize mistakes. Still, because `TimerHandle` cannot enforce 319 /// memory safety, all operations are unsafe. 320 #[derive(Debug)] 321 pub(crate) struct TimerHandle { 322 inner: NonNull<TimerShared>, 323 } 324 325 pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>; 326 327 /// The shared state structure of a timer. This structure is shared between the 328 /// frontend (`Entry`) and driver backend. 329 /// 330 /// Note that this structure is located inside the `TimerEntry` structure. 331 pub(crate) struct TimerShared { 332 /// The shard id. We should never change it. 333 shard_id: u32, 334 /// A link within the doubly-linked list of timers on a particular level and 335 /// slot. Valid only if state is equal to Registered. 336 /// 337 /// Only accessed under the entry lock. 338 pointers: linked_list::Pointers<TimerShared>, 339 340 /// The expiration time for which this entry is currently registered. 341 /// Generally owned by the driver, but is accessed by the entry when not 342 /// registered. 343 cached_when: AtomicU64, 344 345 /// Current state. This records whether the timer entry is currently under 346 /// the ownership of the driver, and if not, its current state (not 347 /// complete, fired, error, etc). 348 state: StateCell, 349 350 _p: PhantomPinned, 351 } 352 353 unsafe impl Send for TimerShared {} 354 unsafe impl Sync for TimerShared {} 355 356 impl std::fmt::Debug for TimerShared { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 358 f.debug_struct("TimerShared") 359 .field("cached_when", &self.cached_when.load(Ordering::Relaxed)) 360 .field("state", &self.state) 361 .finish() 362 } 363 } 364 365 generate_addr_of_methods! { 366 impl<> TimerShared { 367 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<TimerShared>> { 368 &self.pointers 369 } 370 } 371 } 372 373 impl TimerShared { new(shard_id: u32) -> Self374 pub(super) fn new(shard_id: u32) -> Self { 375 Self { 376 shard_id, 377 cached_when: AtomicU64::new(0), 378 pointers: linked_list::Pointers::new(), 379 state: StateCell::default(), 380 _p: PhantomPinned, 381 } 382 } 383 384 /// Gets the cached time-of-expiration value. cached_when(&self) -> u64385 pub(super) fn cached_when(&self) -> u64 { 386 // Cached-when is only accessed under the driver lock, so we can use relaxed 387 self.cached_when.load(Ordering::Relaxed) 388 } 389 390 /// Gets the true time-of-expiration value, and copies it into the cached 391 /// time-of-expiration value. 392 /// 393 /// SAFETY: Must be called with the driver lock held, and when this entry is 394 /// not in any timer wheel lists. sync_when(&self) -> u64395 pub(super) unsafe fn sync_when(&self) -> u64 { 396 let true_when = self.true_when(); 397 398 self.cached_when.store(true_when, Ordering::Relaxed); 399 400 true_when 401 } 402 403 /// Sets the cached time-of-expiration value. 404 /// 405 /// SAFETY: Must be called with the driver lock held, and when this entry is 406 /// not in any timer wheel lists. set_cached_when(&self, when: u64)407 unsafe fn set_cached_when(&self, when: u64) { 408 self.cached_when.store(when, Ordering::Relaxed); 409 } 410 411 /// Returns the true time-of-expiration value, with relaxed memory ordering. true_when(&self) -> u64412 pub(super) fn true_when(&self) -> u64 { 413 self.state.when().expect("Timer already fired") 414 } 415 416 /// Sets the true time-of-expiration value, even if it is less than the 417 /// current expiration or the timer is deregistered. 418 /// 419 /// SAFETY: Must only be called with the driver lock held and the entry not 420 /// in the timer wheel. set_expiration(&self, t: u64)421 pub(super) unsafe fn set_expiration(&self, t: u64) { 422 self.state.set_expiration(t); 423 self.cached_when.store(t, Ordering::Relaxed); 424 } 425 426 /// Sets the true time-of-expiration only if it is after the current. extend_expiration(&self, t: u64) -> Result<(), ()>427 pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> { 428 self.state.extend_expiration(t) 429 } 430 431 /// Returns a `TimerHandle` for this timer. handle(&self) -> TimerHandle432 pub(super) fn handle(&self) -> TimerHandle { 433 TimerHandle { 434 inner: NonNull::from(self), 435 } 436 } 437 438 /// Returns true if the state of this timer indicates that the timer might 439 /// be registered with the driver. This check is performed with relaxed 440 /// ordering, but is conservative - if it returns false, the timer is 441 /// definitely _not_ registered. might_be_registered(&self) -> bool442 pub(super) fn might_be_registered(&self) -> bool { 443 self.state.might_be_registered() 444 } 445 446 /// Gets the shard id. shard_id(&self) -> u32447 pub(super) fn shard_id(&self) -> u32 { 448 self.shard_id 449 } 450 } 451 452 unsafe impl linked_list::Link for TimerShared { 453 type Handle = TimerHandle; 454 455 type Target = TimerShared; 456 as_raw(handle: &Self::Handle) -> NonNull<Self::Target>457 fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target> { 458 handle.inner 459 } 460 from_raw(ptr: NonNull<Self::Target>) -> Self::Handle461 unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle { 462 TimerHandle { inner: ptr } 463 } 464 pointers( target: NonNull<Self::Target>, ) -> NonNull<linked_list::Pointers<Self::Target>>465 unsafe fn pointers( 466 target: NonNull<Self::Target>, 467 ) -> NonNull<linked_list::Pointers<Self::Target>> { 468 TimerShared::addr_of_pointers(target) 469 } 470 } 471 472 // ===== impl Entry ===== 473 474 impl TimerEntry { 475 #[track_caller] new(handle: scheduler::Handle, deadline: Instant) -> Self476 pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self { 477 // Panic if the time driver is not enabled 478 let _ = handle.driver().time(); 479 480 Self { 481 driver: handle, 482 inner: StdUnsafeCell::new(None), 483 deadline, 484 registered: false, 485 _m: std::marker::PhantomPinned, 486 } 487 } 488 is_inner_init(&self) -> bool489 fn is_inner_init(&self) -> bool { 490 unsafe { &*self.inner.get() }.is_some() 491 } 492 493 // This lazy initialization is for performance purposes. inner(&self) -> &TimerShared494 fn inner(&self) -> &TimerShared { 495 let inner = unsafe { &*self.inner.get() }; 496 if inner.is_none() { 497 let shard_size = self.driver.driver().time().inner.get_shard_size(); 498 let shard_id = generate_shard_id(shard_size); 499 unsafe { 500 *self.inner.get() = Some(TimerShared::new(shard_id)); 501 } 502 } 503 return inner.as_ref().unwrap(); 504 } 505 deadline(&self) -> Instant506 pub(crate) fn deadline(&self) -> Instant { 507 self.deadline 508 } 509 is_elapsed(&self) -> bool510 pub(crate) fn is_elapsed(&self) -> bool { 511 self.is_inner_init() && !self.inner().state.might_be_registered() && self.registered 512 } 513 514 /// Cancels and deregisters the timer. This operation is irreversible. cancel(self: Pin<&mut Self>)515 pub(crate) fn cancel(self: Pin<&mut Self>) { 516 // Avoid calling the `clear_entry` method, because it has not been initialized yet. 517 if !self.is_inner_init() { 518 return; 519 } 520 // We need to perform an acq/rel fence with the driver thread, and the 521 // simplest way to do so is to grab the driver lock. 522 // 523 // Why is this necessary? We're about to release this timer's memory for 524 // some other non-timer use. However, we've been doing a bunch of 525 // relaxed (or even non-atomic) writes from the driver thread, and we'll 526 // be doing more from _this thread_ (as this memory is interpreted as 527 // something else). 528 // 529 // It is critical to ensure that, from the point of view of the driver, 530 // those future non-timer writes happen-after the timer is fully fired, 531 // and from the purpose of this thread, the driver's writes all 532 // happen-before we drop the timer. This in turn requires us to perform 533 // an acquire-release barrier in _both_ directions between the driver 534 // and dropping thread. 535 // 536 // The lock acquisition in clear_entry serves this purpose. All of the 537 // driver manipulations happen with the lock held, so we can just take 538 // the lock and be sure that this drop happens-after everything the 539 // driver did so far and happens-before everything the driver does in 540 // the future. While we have the lock held, we also go ahead and 541 // deregister the entry if necessary. 542 unsafe { self.driver().clear_entry(NonNull::from(self.inner())) }; 543 } 544 reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool)545 pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) { 546 let this = unsafe { self.as_mut().get_unchecked_mut() }; 547 this.deadline = new_time; 548 this.registered = reregister; 549 550 let tick = self.driver().time_source().deadline_to_tick(new_time); 551 552 if self.inner().extend_expiration(tick).is_ok() { 553 return; 554 } 555 556 if reregister { 557 unsafe { 558 self.driver() 559 .reregister(&self.driver.driver().io, tick, self.inner().into()); 560 } 561 } 562 } 563 poll_elapsed( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), super::Error>>564 pub(crate) fn poll_elapsed( 565 mut self: Pin<&mut Self>, 566 cx: &mut Context<'_>, 567 ) -> Poll<Result<(), super::Error>> { 568 assert!( 569 !self.driver().is_shutdown(), 570 "{}", 571 crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR 572 ); 573 574 if !self.registered { 575 let deadline = self.deadline; 576 self.as_mut().reset(deadline, true); 577 } 578 579 self.inner().state.poll(cx.waker()) 580 } 581 driver(&self) -> &super::Handle582 pub(crate) fn driver(&self) -> &super::Handle { 583 self.driver.driver().time() 584 } 585 586 #[cfg(all(tokio_unstable, feature = "tracing"))] clock(&self) -> &super::Clock587 pub(crate) fn clock(&self) -> &super::Clock { 588 self.driver.driver().clock() 589 } 590 } 591 592 impl TimerHandle { cached_when(&self) -> u64593 pub(super) unsafe fn cached_when(&self) -> u64 { 594 unsafe { self.inner.as_ref().cached_when() } 595 } 596 sync_when(&self) -> u64597 pub(super) unsafe fn sync_when(&self) -> u64 { 598 unsafe { self.inner.as_ref().sync_when() } 599 } 600 is_pending(&self) -> bool601 pub(super) unsafe fn is_pending(&self) -> bool { 602 unsafe { self.inner.as_ref().state.is_pending() } 603 } 604 605 /// Forcibly sets the true and cached expiration times to the given tick. 606 /// 607 /// SAFETY: The caller must ensure that the handle remains valid, the driver 608 /// lock is held, and that the timer is not in any wheel linked lists. set_expiration(&self, tick: u64)609 pub(super) unsafe fn set_expiration(&self, tick: u64) { 610 self.inner.as_ref().set_expiration(tick); 611 } 612 613 /// Attempts to mark this entry as pending. If the expiration time is after 614 /// `not_after`, however, returns an Err with the current expiration time. 615 /// 616 /// If an `Err` is returned, the `cached_when` value will be updated to this 617 /// new expiration time. 618 /// 619 /// SAFETY: The caller must ensure that the handle remains valid, the driver 620 /// lock is held, and that the timer is not in any wheel linked lists. 621 /// After returning Ok, the entry must be added to the pending list. mark_pending(&self, not_after: u64) -> Result<(), u64>622 pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { 623 match self.inner.as_ref().state.mark_pending(not_after) { 624 Ok(()) => { 625 // mark this as being on the pending queue in cached_when 626 self.inner.as_ref().set_cached_when(u64::MAX); 627 Ok(()) 628 } 629 Err(tick) => { 630 self.inner.as_ref().set_cached_when(tick); 631 Err(tick) 632 } 633 } 634 } 635 636 /// Attempts to transition to a terminal state. If the state is already a 637 /// terminal state, does nothing. 638 /// 639 /// Because the entry might be dropped after the state is moved to a 640 /// terminal state, this function consumes the handle to ensure we don't 641 /// access the entry afterwards. 642 /// 643 /// Returns the last-registered waker, if any. 644 /// 645 /// SAFETY: The driver lock must be held while invoking this function, and 646 /// the entry must not be in any wheel linked lists. fire(self, completed_state: TimerResult) -> Option<Waker>647 pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> { 648 self.inner.as_ref().state.fire(completed_state) 649 } 650 } 651 652 impl Drop for TimerEntry { drop(&mut self)653 fn drop(&mut self) { 654 unsafe { Pin::new_unchecked(self) }.as_mut().cancel(); 655 } 656 } 657 658 // Generates a shard id. If current thread is a worker thread, we use its worker index as a shard id. 659 // Otherwise, we use a random number generator to obtain the shard id. 660 cfg_rt! { 661 fn generate_shard_id(shard_size: u32) -> u32 { 662 let id = context::with_scheduler(|ctx| match ctx { 663 Some(scheduler::Context::CurrentThread(_ctx)) => 0, 664 #[cfg(feature = "rt-multi-thread")] 665 Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, 666 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 667 Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, 668 None => context::thread_rng_n(shard_size), 669 }); 670 id % shard_size 671 } 672 } 673 674 cfg_not_rt! { 675 fn generate_shard_id(shard_size: u32) -> u32 { 676 context::thread_rng_n(shard_size) 677 } 678 } 679