1 //! Timer state structures.
2 //!
3 //! This module contains the heart of the intrusive timer implementation, and as
4 //! such the structures inside are full of tricky concurrency and unsafe code.
5 //!
6 //! # Ground rules
7 //!
8 //! The heart of the timer implementation here is the [`TimerShared`] structure,
9 //! shared between the [`TimerEntry`] and the driver. Generally, we permit access
10 //! to [`TimerShared`] ONLY via either 1) a mutable reference to [`TimerEntry`] or
11 //! 2) a held driver lock.
12 //!
13 //! It follows from this that any changes made while holding BOTH 1 and 2 will
14 //! be reliably visible, regardless of ordering. This is because of the `acq/rel`
15 //! fences on the driver lock ensuring ordering with 2, and rust mutable
16 //! reference rules for 1 (a mutable reference to an object can't be passed
17 //! between threads without an `acq/rel` barrier, and same-thread we have local
18 //! happens-before ordering).
19 //!
20 //! # State field
21 //!
22 //! Each timer has a state field associated with it. This field contains either
23 //! the current scheduled time, or a special flag value indicating its state.
24 //! This state can either indicate that the timer is on the 'pending' queue (and
25 //! thus will be fired with an `Ok(())` result soon) or that it has already been
26 //! fired/deregistered.
27 //!
28 //! This single state field allows for code that is firing the timer to
29 //! synchronize with any racing `reset` calls reliably.
30 //!
31 //! # Cached vs true timeouts
32 //!
33 //! To allow for the use case of a timeout that is periodically reset before
34 //! expiration to be as lightweight as possible, we support optimistically
35 //! lock-free timer resets, in the case where a timer is rescheduled to a later
36 //! point than it was originally scheduled for.
37 //!
38 //! This is accomplished by lazily rescheduling timers. That is, we update the
39 //! state field with the true expiration of the timer from the holder of
40 //! the [`TimerEntry`]. When the driver services timers (ie, whenever it's
41 //! walking lists of timers), it checks this "true when" value, and reschedules
42 //! based on it.
43 //!
44 //! We do, however, also need to track what the expiration time was when we
45 //! originally registered the timer; this is used to locate the right linked
46 //! list when the timer is being cancelled. This is referred to as the "cached
47 //! when" internally.
48 //!
49 //! There is of course a race condition between timer reset and timer
50 //! expiration. If the driver fails to observe the updated expiration time, it
51 //! could trigger expiration of the timer too early. However, because
52 //! [`mark_pending`][mark_pending] performs a compare-and-swap, it will identify this race and
53 //! refuse to mark the timer as pending.
54 //!
55 //! [mark_pending]: TimerHandle::mark_pending
56 
57 use crate::loom::cell::UnsafeCell;
58 use crate::loom::sync::atomic::AtomicU64;
59 use crate::loom::sync::atomic::Ordering;
60 
61 use crate::runtime::context;
62 use crate::runtime::scheduler;
63 use crate::sync::AtomicWaker;
64 use crate::time::Instant;
65 use crate::util::linked_list;
66 
67 use std::cell::UnsafeCell as StdUnsafeCell;
68 use std::task::{Context, Poll, Waker};
69 use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
70 
71 type TimerResult = Result<(), crate::time::error::Error>;
72 
73 const STATE_DEREGISTERED: u64 = u64::MAX;
74 const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
75 const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
76 /// The largest safe integer to use for ticks.
77 ///
78 /// This value should be updated if any other signal values are added above.
79 pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = STATE_MIN_VALUE - 1;
80 
81 /// This structure holds the current shared state of the timer - its scheduled
82 /// time (if registered), or otherwise the result of the timer completing, as
83 /// well as the registered waker.
84 ///
85 /// Generally, the `StateCell` is only permitted to be accessed from two contexts:
86 /// Either a thread holding the corresponding `&mut TimerEntry`, or a thread
87 /// holding the timer driver lock. The write actions on the `StateCell` amount to
88 /// passing "ownership" of the `StateCell` between these contexts; moving a timer
89 /// from the `TimerEntry` to the driver requires _both_ holding the `&mut
90 /// TimerEntry` and the driver lock, while moving it back (firing the timer)
91 /// requires only the driver lock.
92 pub(super) struct StateCell {
93     /// Holds either the scheduled expiration time for this timer, or (if the
94     /// timer has been fired and is unregistered), `u64::MAX`.
95     state: AtomicU64,
96     /// If the timer is fired (an Acquire order read on state shows
97     /// `u64::MAX`), holds the result that should be returned from
98     /// polling the timer. Otherwise, the contents are unspecified and reading
99     /// without holding the driver lock is undefined behavior.
100     result: UnsafeCell<TimerResult>,
101     /// The currently-registered waker
102     waker: AtomicWaker,
103 }
104 
105 impl Default for StateCell {
default() -> Self106     fn default() -> Self {
107         Self::new()
108     }
109 }
110 
111 impl std::fmt::Debug for StateCell {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result112     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113         write!(f, "StateCell({:?})", self.read_state())
114     }
115 }
116 
117 impl StateCell {
new() -> Self118     fn new() -> Self {
119         Self {
120             state: AtomicU64::new(STATE_DEREGISTERED),
121             result: UnsafeCell::new(Ok(())),
122             waker: AtomicWaker::new(),
123         }
124     }
125 
is_pending(&self) -> bool126     fn is_pending(&self) -> bool {
127         self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE
128     }
129 
130     /// Returns the current expiration time, or None if not currently scheduled.
when(&self) -> Option<u64>131     fn when(&self) -> Option<u64> {
132         let cur_state = self.state.load(Ordering::Relaxed);
133 
134         if cur_state == STATE_DEREGISTERED {
135             None
136         } else {
137             Some(cur_state)
138         }
139     }
140 
141     /// If the timer is completed, returns the result of the timer. Otherwise,
142     /// returns None and registers the waker.
poll(&self, waker: &Waker) -> Poll<TimerResult>143     fn poll(&self, waker: &Waker) -> Poll<TimerResult> {
144         // We must register first. This ensures that either `fire` will
145         // observe the new waker, or we will observe a racing fire to have set
146         // the state, or both.
147         self.waker.register_by_ref(waker);
148 
149         self.read_state()
150     }
151 
read_state(&self) -> Poll<TimerResult>152     fn read_state(&self) -> Poll<TimerResult> {
153         let cur_state = self.state.load(Ordering::Acquire);
154 
155         if cur_state == STATE_DEREGISTERED {
156             // SAFETY: The driver has fired this timer; this involves writing
157             // the result, and then writing (with release ordering) the state
158             // field.
159             Poll::Ready(unsafe { self.result.with(|p| *p) })
160         } else {
161             Poll::Pending
162         }
163     }
164 
165     /// Marks this timer as being moved to the pending list, if its scheduled
166     /// time is not after `not_after`.
167     ///
168     /// If the timer is scheduled for a time after `not_after`, returns an Err
169     /// containing the current scheduled time.
170     ///
171     /// SAFETY: Must hold the driver lock.
mark_pending(&self, not_after: u64) -> Result<(), u64>172     unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
173         // Quick initial debug check to see if the timer is already fired. Since
174         // firing the timer can only happen with the driver lock held, we know
175         // we shouldn't be able to "miss" a transition to a fired state, even
176         // with relaxed ordering.
177         let mut cur_state = self.state.load(Ordering::Relaxed);
178 
179         loop {
180             // improve the error message for things like
181             // https://github.com/tokio-rs/tokio/issues/3675
182             assert!(
183                 cur_state < STATE_MIN_VALUE,
184                 "mark_pending called when the timer entry is in an invalid state"
185             );
186 
187             if cur_state > not_after {
188                 break Err(cur_state);
189             }
190 
191             match self.state.compare_exchange_weak(
192                 cur_state,
193                 STATE_PENDING_FIRE,
194                 Ordering::AcqRel,
195                 Ordering::Acquire,
196             ) {
197                 Ok(_) => break Ok(()),
198                 Err(actual_state) => cur_state = actual_state,
199             }
200         }
201     }
202 
203     /// Fires the timer, setting the result to the provided result.
204     ///
205     /// Returns:
206     /// * `Some(waker)` - if fired and a waker needs to be invoked once the
207     ///   driver lock is released
208     /// * `None` - if fired and a waker does not need to be invoked, or if
209     ///   already fired
210     ///
211     /// SAFETY: The driver lock must be held.
fire(&self, result: TimerResult) -> Option<Waker>212     unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
213         // Quick initial check to see if the timer is already fired. Since
214         // firing the timer can only happen with the driver lock held, we know
215         // we shouldn't be able to "miss" a transition to a fired state, even
216         // with relaxed ordering.
217         let cur_state = self.state.load(Ordering::Relaxed);
218         if cur_state == STATE_DEREGISTERED {
219             return None;
220         }
221 
222         // SAFETY: We assume the driver lock is held and the timer is not
223         // fired, so only the driver is accessing this field.
224         //
225         // We perform a release-ordered store to state below, to ensure this
226         // write is visible before the state update is visible.
227         unsafe { self.result.with_mut(|p| *p = result) };
228 
229         self.state.store(STATE_DEREGISTERED, Ordering::Release);
230 
231         self.waker.take_waker()
232     }
233 
234     /// Marks the timer as registered (poll will return None) and sets the
235     /// expiration time.
236     ///
237     /// While this function is memory-safe, it should only be called from a
238     /// context holding both `&mut TimerEntry` and the driver lock.
set_expiration(&self, timestamp: u64)239     fn set_expiration(&self, timestamp: u64) {
240         debug_assert!(timestamp < STATE_MIN_VALUE);
241 
242         // We can use relaxed ordering because we hold the driver lock and will
243         // fence when we release the lock.
244         self.state.store(timestamp, Ordering::Relaxed);
245     }
246 
247     /// Attempts to adjust the timer to a new timestamp.
248     ///
249     /// If the timer has already been fired, is pending firing, or the new
250     /// timestamp is earlier than the old timestamp, (or occasionally
251     /// spuriously) returns Err without changing the timer's state. In this
252     /// case, the timer must be deregistered and re-registered.
extend_expiration(&self, new_timestamp: u64) -> Result<(), ()>253     fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> {
254         let mut prior = self.state.load(Ordering::Relaxed);
255         loop {
256             if new_timestamp < prior || prior >= STATE_MIN_VALUE {
257                 return Err(());
258             }
259 
260             match self.state.compare_exchange_weak(
261                 prior,
262                 new_timestamp,
263                 Ordering::AcqRel,
264                 Ordering::Acquire,
265             ) {
266                 Ok(_) => return Ok(()),
267                 Err(true_prior) => prior = true_prior,
268             }
269         }
270     }
271 
272     /// Returns true if the state of this timer indicates that the timer might
273     /// be registered with the driver. This check is performed with relaxed
274     /// ordering, but is conservative - if it returns false, the timer is
275     /// definitely _not_ registered.
might_be_registered(&self) -> bool276     pub(super) fn might_be_registered(&self) -> bool {
277         self.state.load(Ordering::Relaxed) != u64::MAX
278     }
279 }
280 
281 /// A timer entry.
282 ///
283 /// This is the handle to a timer that is controlled by the requester of the
284 /// timer. As this participates in intrusive data structures, it must be pinned
285 /// before polling.
286 #[derive(Debug)]
287 pub(crate) struct TimerEntry {
288     /// Arc reference to the runtime handle. We can only free the driver after
289     /// deregistering everything from their respective timer wheels.
290     driver: scheduler::Handle,
291     /// Shared inner structure; this is part of an intrusive linked list, and
292     /// therefore other references can exist to it while mutable references to
293     /// Entry exist.
294     ///
295     /// This is manipulated only under the inner mutex. TODO: Can we use loom
296     /// cells for this?
297     inner: StdUnsafeCell<Option<TimerShared>>,
298     /// Deadline for the timer. This is used to register on the first
299     /// poll, as we can't register prior to being pinned.
300     deadline: Instant,
301     /// Whether the deadline has been registered.
302     registered: bool,
303     /// Ensure the type is !Unpin
304     _m: std::marker::PhantomPinned,
305 }
306 
307 unsafe impl Send for TimerEntry {}
308 unsafe impl Sync for TimerEntry {}
309 
310 /// An `TimerHandle` is the (non-enforced) "unique" pointer from the driver to the
311 /// timer entry. Generally, at most one `TimerHandle` exists for a timer at a time
312 /// (enforced by the timer state machine).
313 ///
314 /// SAFETY: An `TimerHandle` is essentially a raw pointer, and the usual caveats
315 /// of pointer safety apply. In particular, `TimerHandle` does not itself enforce
316 /// that the timer does still exist; however, normally an `TimerHandle` is created
317 /// immediately before registering the timer, and is consumed when firing the
318 /// timer, to help minimize mistakes. Still, because `TimerHandle` cannot enforce
319 /// memory safety, all operations are unsafe.
320 #[derive(Debug)]
321 pub(crate) struct TimerHandle {
322     inner: NonNull<TimerShared>,
323 }
324 
325 pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>;
326 
327 /// The shared state structure of a timer. This structure is shared between the
328 /// frontend (`Entry`) and driver backend.
329 ///
330 /// Note that this structure is located inside the `TimerEntry` structure.
331 pub(crate) struct TimerShared {
332     /// The shard id. We should never change it.
333     shard_id: u32,
334     /// A link within the doubly-linked list of timers on a particular level and
335     /// slot. Valid only if state is equal to Registered.
336     ///
337     /// Only accessed under the entry lock.
338     pointers: linked_list::Pointers<TimerShared>,
339 
340     /// The expiration time for which this entry is currently registered.
341     /// Generally owned by the driver, but is accessed by the entry when not
342     /// registered.
343     cached_when: AtomicU64,
344 
345     /// Current state. This records whether the timer entry is currently under
346     /// the ownership of the driver, and if not, its current state (not
347     /// complete, fired, error, etc).
348     state: StateCell,
349 
350     _p: PhantomPinned,
351 }
352 
353 unsafe impl Send for TimerShared {}
354 unsafe impl Sync for TimerShared {}
355 
356 impl std::fmt::Debug for TimerShared {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result357     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358         f.debug_struct("TimerShared")
359             .field("cached_when", &self.cached_when.load(Ordering::Relaxed))
360             .field("state", &self.state)
361             .finish()
362     }
363 }
364 
365 generate_addr_of_methods! {
366     impl<> TimerShared {
367         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<TimerShared>> {
368             &self.pointers
369         }
370     }
371 }
372 
373 impl TimerShared {
new(shard_id: u32) -> Self374     pub(super) fn new(shard_id: u32) -> Self {
375         Self {
376             shard_id,
377             cached_when: AtomicU64::new(0),
378             pointers: linked_list::Pointers::new(),
379             state: StateCell::default(),
380             _p: PhantomPinned,
381         }
382     }
383 
384     /// Gets the cached time-of-expiration value.
cached_when(&self) -> u64385     pub(super) fn cached_when(&self) -> u64 {
386         // Cached-when is only accessed under the driver lock, so we can use relaxed
387         self.cached_when.load(Ordering::Relaxed)
388     }
389 
390     /// Gets the true time-of-expiration value, and copies it into the cached
391     /// time-of-expiration value.
392     ///
393     /// SAFETY: Must be called with the driver lock held, and when this entry is
394     /// not in any timer wheel lists.
sync_when(&self) -> u64395     pub(super) unsafe fn sync_when(&self) -> u64 {
396         let true_when = self.true_when();
397 
398         self.cached_when.store(true_when, Ordering::Relaxed);
399 
400         true_when
401     }
402 
403     /// Sets the cached time-of-expiration value.
404     ///
405     /// SAFETY: Must be called with the driver lock held, and when this entry is
406     /// not in any timer wheel lists.
set_cached_when(&self, when: u64)407     unsafe fn set_cached_when(&self, when: u64) {
408         self.cached_when.store(when, Ordering::Relaxed);
409     }
410 
411     /// Returns the true time-of-expiration value, with relaxed memory ordering.
true_when(&self) -> u64412     pub(super) fn true_when(&self) -> u64 {
413         self.state.when().expect("Timer already fired")
414     }
415 
416     /// Sets the true time-of-expiration value, even if it is less than the
417     /// current expiration or the timer is deregistered.
418     ///
419     /// SAFETY: Must only be called with the driver lock held and the entry not
420     /// in the timer wheel.
set_expiration(&self, t: u64)421     pub(super) unsafe fn set_expiration(&self, t: u64) {
422         self.state.set_expiration(t);
423         self.cached_when.store(t, Ordering::Relaxed);
424     }
425 
426     /// Sets the true time-of-expiration only if it is after the current.
extend_expiration(&self, t: u64) -> Result<(), ()>427     pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> {
428         self.state.extend_expiration(t)
429     }
430 
431     /// Returns a `TimerHandle` for this timer.
handle(&self) -> TimerHandle432     pub(super) fn handle(&self) -> TimerHandle {
433         TimerHandle {
434             inner: NonNull::from(self),
435         }
436     }
437 
438     /// Returns true if the state of this timer indicates that the timer might
439     /// be registered with the driver. This check is performed with relaxed
440     /// ordering, but is conservative - if it returns false, the timer is
441     /// definitely _not_ registered.
might_be_registered(&self) -> bool442     pub(super) fn might_be_registered(&self) -> bool {
443         self.state.might_be_registered()
444     }
445 
446     /// Gets the shard id.
shard_id(&self) -> u32447     pub(super) fn shard_id(&self) -> u32 {
448         self.shard_id
449     }
450 }
451 
452 unsafe impl linked_list::Link for TimerShared {
453     type Handle = TimerHandle;
454 
455     type Target = TimerShared;
456 
as_raw(handle: &Self::Handle) -> NonNull<Self::Target>457     fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target> {
458         handle.inner
459     }
460 
from_raw(ptr: NonNull<Self::Target>) -> Self::Handle461     unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle {
462         TimerHandle { inner: ptr }
463     }
464 
pointers( target: NonNull<Self::Target>, ) -> NonNull<linked_list::Pointers<Self::Target>>465     unsafe fn pointers(
466         target: NonNull<Self::Target>,
467     ) -> NonNull<linked_list::Pointers<Self::Target>> {
468         TimerShared::addr_of_pointers(target)
469     }
470 }
471 
472 // ===== impl Entry =====
473 
474 impl TimerEntry {
475     #[track_caller]
new(handle: scheduler::Handle, deadline: Instant) -> Self476     pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self {
477         // Panic if the time driver is not enabled
478         let _ = handle.driver().time();
479 
480         Self {
481             driver: handle,
482             inner: StdUnsafeCell::new(None),
483             deadline,
484             registered: false,
485             _m: std::marker::PhantomPinned,
486         }
487     }
488 
is_inner_init(&self) -> bool489     fn is_inner_init(&self) -> bool {
490         unsafe { &*self.inner.get() }.is_some()
491     }
492 
493     // This lazy initialization is for performance purposes.
inner(&self) -> &TimerShared494     fn inner(&self) -> &TimerShared {
495         let inner = unsafe { &*self.inner.get() };
496         if inner.is_none() {
497             let shard_size = self.driver.driver().time().inner.get_shard_size();
498             let shard_id = generate_shard_id(shard_size);
499             unsafe {
500                 *self.inner.get() = Some(TimerShared::new(shard_id));
501             }
502         }
503         return inner.as_ref().unwrap();
504     }
505 
deadline(&self) -> Instant506     pub(crate) fn deadline(&self) -> Instant {
507         self.deadline
508     }
509 
is_elapsed(&self) -> bool510     pub(crate) fn is_elapsed(&self) -> bool {
511         self.is_inner_init() && !self.inner().state.might_be_registered() && self.registered
512     }
513 
514     /// Cancels and deregisters the timer. This operation is irreversible.
cancel(self: Pin<&mut Self>)515     pub(crate) fn cancel(self: Pin<&mut Self>) {
516         // Avoid calling the `clear_entry` method, because it has not been initialized yet.
517         if !self.is_inner_init() {
518             return;
519         }
520         // We need to perform an acq/rel fence with the driver thread, and the
521         // simplest way to do so is to grab the driver lock.
522         //
523         // Why is this necessary? We're about to release this timer's memory for
524         // some other non-timer use. However, we've been doing a bunch of
525         // relaxed (or even non-atomic) writes from the driver thread, and we'll
526         // be doing more from _this thread_ (as this memory is interpreted as
527         // something else).
528         //
529         // It is critical to ensure that, from the point of view of the driver,
530         // those future non-timer writes happen-after the timer is fully fired,
531         // and from the purpose of this thread, the driver's writes all
532         // happen-before we drop the timer. This in turn requires us to perform
533         // an acquire-release barrier in _both_ directions between the driver
534         // and dropping thread.
535         //
536         // The lock acquisition in clear_entry serves this purpose. All of the
537         // driver manipulations happen with the lock held, so we can just take
538         // the lock and be sure that this drop happens-after everything the
539         // driver did so far and happens-before everything the driver does in
540         // the future. While we have the lock held, we also go ahead and
541         // deregister the entry if necessary.
542         unsafe { self.driver().clear_entry(NonNull::from(self.inner())) };
543     }
544 
reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool)545     pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) {
546         let this = unsafe { self.as_mut().get_unchecked_mut() };
547         this.deadline = new_time;
548         this.registered = reregister;
549 
550         let tick = self.driver().time_source().deadline_to_tick(new_time);
551 
552         if self.inner().extend_expiration(tick).is_ok() {
553             return;
554         }
555 
556         if reregister {
557             unsafe {
558                 self.driver()
559                     .reregister(&self.driver.driver().io, tick, self.inner().into());
560             }
561         }
562     }
563 
poll_elapsed( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), super::Error>>564     pub(crate) fn poll_elapsed(
565         mut self: Pin<&mut Self>,
566         cx: &mut Context<'_>,
567     ) -> Poll<Result<(), super::Error>> {
568         assert!(
569             !self.driver().is_shutdown(),
570             "{}",
571             crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
572         );
573 
574         if !self.registered {
575             let deadline = self.deadline;
576             self.as_mut().reset(deadline, true);
577         }
578 
579         self.inner().state.poll(cx.waker())
580     }
581 
driver(&self) -> &super::Handle582     pub(crate) fn driver(&self) -> &super::Handle {
583         self.driver.driver().time()
584     }
585 
586     #[cfg(all(tokio_unstable, feature = "tracing"))]
clock(&self) -> &super::Clock587     pub(crate) fn clock(&self) -> &super::Clock {
588         self.driver.driver().clock()
589     }
590 }
591 
592 impl TimerHandle {
cached_when(&self) -> u64593     pub(super) unsafe fn cached_when(&self) -> u64 {
594         unsafe { self.inner.as_ref().cached_when() }
595     }
596 
sync_when(&self) -> u64597     pub(super) unsafe fn sync_when(&self) -> u64 {
598         unsafe { self.inner.as_ref().sync_when() }
599     }
600 
is_pending(&self) -> bool601     pub(super) unsafe fn is_pending(&self) -> bool {
602         unsafe { self.inner.as_ref().state.is_pending() }
603     }
604 
605     /// Forcibly sets the true and cached expiration times to the given tick.
606     ///
607     /// SAFETY: The caller must ensure that the handle remains valid, the driver
608     /// lock is held, and that the timer is not in any wheel linked lists.
set_expiration(&self, tick: u64)609     pub(super) unsafe fn set_expiration(&self, tick: u64) {
610         self.inner.as_ref().set_expiration(tick);
611     }
612 
613     /// Attempts to mark this entry as pending. If the expiration time is after
614     /// `not_after`, however, returns an Err with the current expiration time.
615     ///
616     /// If an `Err` is returned, the `cached_when` value will be updated to this
617     /// new expiration time.
618     ///
619     /// SAFETY: The caller must ensure that the handle remains valid, the driver
620     /// lock is held, and that the timer is not in any wheel linked lists.
621     /// After returning Ok, the entry must be added to the pending list.
mark_pending(&self, not_after: u64) -> Result<(), u64>622     pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
623         match self.inner.as_ref().state.mark_pending(not_after) {
624             Ok(()) => {
625                 // mark this as being on the pending queue in cached_when
626                 self.inner.as_ref().set_cached_when(u64::MAX);
627                 Ok(())
628             }
629             Err(tick) => {
630                 self.inner.as_ref().set_cached_when(tick);
631                 Err(tick)
632             }
633         }
634     }
635 
636     /// Attempts to transition to a terminal state. If the state is already a
637     /// terminal state, does nothing.
638     ///
639     /// Because the entry might be dropped after the state is moved to a
640     /// terminal state, this function consumes the handle to ensure we don't
641     /// access the entry afterwards.
642     ///
643     /// Returns the last-registered waker, if any.
644     ///
645     /// SAFETY: The driver lock must be held while invoking this function, and
646     /// the entry must not be in any wheel linked lists.
fire(self, completed_state: TimerResult) -> Option<Waker>647     pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> {
648         self.inner.as_ref().state.fire(completed_state)
649     }
650 }
651 
652 impl Drop for TimerEntry {
drop(&mut self)653     fn drop(&mut self) {
654         unsafe { Pin::new_unchecked(self) }.as_mut().cancel();
655     }
656 }
657 
658 // Generates a shard id. If current thread is a worker thread, we use its worker index as a shard id.
659 // Otherwise, we use a random number generator to obtain the shard id.
660 cfg_rt! {
661     fn generate_shard_id(shard_size: u32) -> u32 {
662         let id = context::with_scheduler(|ctx| match ctx {
663             Some(scheduler::Context::CurrentThread(_ctx)) => 0,
664             #[cfg(feature = "rt-multi-thread")]
665             Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32,
666             #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
667             Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32,
668             None => context::thread_rng_n(shard_size),
669         });
670         id % shard_size
671     }
672 }
673 
674 cfg_not_rt! {
675     fn generate_shard_id(shard_size: u32) -> u32 {
676         context::thread_rng_n(shard_size)
677     }
678 }
679