1 use crate::runtime::time::TimerEntry;
2 use crate::time::{error::Error, Duration, Instant};
3 use crate::util::trace;
4 
5 use pin_project_lite::pin_project;
6 use std::future::Future;
7 use std::panic::Location;
8 use std::pin::Pin;
9 use std::task::{self, ready, Poll};
10 
11 /// Waits until `deadline` is reached.
12 ///
13 /// No work is performed while awaiting on the sleep future to complete. `Sleep`
14 /// operates at millisecond granularity and should not be used for tasks that
15 /// require high-resolution timers.
16 ///
17 /// To run something regularly on a schedule, see [`interval`].
18 ///
19 /// # Cancellation
20 ///
21 /// Canceling a sleep instance is done by dropping the returned future. No additional
22 /// cleanup work is required.
23 ///
24 /// # Examples
25 ///
26 /// Wait 100ms and print "100 ms have elapsed".
27 ///
28 /// ```
29 /// use tokio::time::{sleep_until, Instant, Duration};
30 ///
31 /// #[tokio::main]
32 /// async fn main() {
33 ///     sleep_until(Instant::now() + Duration::from_millis(100)).await;
34 ///     println!("100 ms have elapsed");
35 /// }
36 /// ```
37 ///
38 /// See the documentation for the [`Sleep`] type for more examples.
39 ///
40 /// # Panics
41 ///
42 /// This function panics if there is no current timer set.
43 ///
44 /// It can be triggered when [`Builder::enable_time`] or
45 /// [`Builder::enable_all`] are not included in the builder.
46 ///
47 /// It can also panic whenever a timer is created outside of a
48 /// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
49 /// since the function is executed outside of the runtime.
50 /// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
51 /// And this is because wrapping the function on an async makes it lazy,
52 /// and so gets executed inside the runtime successfully without
53 /// panicking.
54 ///
55 /// [`Sleep`]: struct@crate::time::Sleep
56 /// [`interval`]: crate::time::interval()
57 /// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
58 /// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
59 // Alias for old name in 0.x
60 #[cfg_attr(docsrs, doc(alias = "delay_until"))]
61 #[track_caller]
sleep_until(deadline: Instant) -> Sleep62 pub fn sleep_until(deadline: Instant) -> Sleep {
63     Sleep::new_timeout(deadline, trace::caller_location())
64 }
65 
66 /// Waits until `duration` has elapsed.
67 ///
68 /// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous
69 /// analog to `std::thread::sleep`.
70 ///
71 /// No work is performed while awaiting on the sleep future to complete. `Sleep`
72 /// operates at millisecond granularity and should not be used for tasks that
73 /// require high-resolution timers. The implementation is platform specific,
74 /// and some platforms (specifically Windows) will provide timers with a
75 /// larger resolution than 1 ms.
76 ///
77 /// To run something regularly on a schedule, see [`interval`].
78 ///
79 /// The maximum duration for a sleep is 68719476734 milliseconds (approximately 2.2 years).
80 ///
81 /// # Cancellation
82 ///
83 /// Canceling a sleep instance is done by dropping the returned future. No additional
84 /// cleanup work is required.
85 ///
86 /// # Examples
87 ///
88 /// Wait 100ms and print "100 ms have elapsed".
89 ///
90 /// ```
91 /// use tokio::time::{sleep, Duration};
92 ///
93 /// #[tokio::main]
94 /// async fn main() {
95 ///     sleep(Duration::from_millis(100)).await;
96 ///     println!("100 ms have elapsed");
97 /// }
98 /// ```
99 ///
100 /// See the documentation for the [`Sleep`] type for more examples.
101 ///
102 /// # Panics
103 ///
104 /// This function panics if there is no current timer set.
105 ///
106 /// It can be triggered when [`Builder::enable_time`] or
107 /// [`Builder::enable_all`] are not included in the builder.
108 ///
109 /// It can also panic whenever a timer is created outside of a
110 /// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
111 /// since the function is executed outside of the runtime.
112 /// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
113 /// And this is because wrapping the function on an async makes it lazy,
114 /// and so gets executed inside the runtime successfully without
115 /// panicking.
116 ///
117 /// [`Sleep`]: struct@crate::time::Sleep
118 /// [`interval`]: crate::time::interval()
119 /// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
120 /// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
121 // Alias for old name in 0.x
122 #[cfg_attr(docsrs, doc(alias = "delay_for"))]
123 #[cfg_attr(docsrs, doc(alias = "wait"))]
124 #[track_caller]
sleep(duration: Duration) -> Sleep125 pub fn sleep(duration: Duration) -> Sleep {
126     let location = trace::caller_location();
127 
128     match Instant::now().checked_add(duration) {
129         Some(deadline) => Sleep::new_timeout(deadline, location),
130         None => Sleep::new_timeout(Instant::far_future(), location),
131     }
132 }
133 
134 pin_project! {
135     /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until).
136     ///
137     /// This type does not implement the `Unpin` trait, which means that if you
138     /// use it with [`select!`] or by calling `poll`, you have to pin it first.
139     /// If you use it with `.await`, this does not apply.
140     ///
141     /// # Examples
142     ///
143     /// Wait 100ms and print "100 ms have elapsed".
144     ///
145     /// ```
146     /// use tokio::time::{sleep, Duration};
147     ///
148     /// #[tokio::main]
149     /// async fn main() {
150     ///     sleep(Duration::from_millis(100)).await;
151     ///     println!("100 ms have elapsed");
152     /// }
153     /// ```
154     ///
155     /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is
156     /// necessary when the same `Sleep` is selected on multiple times.
157     /// ```no_run
158     /// use tokio::time::{self, Duration, Instant};
159     ///
160     /// #[tokio::main]
161     /// async fn main() {
162     ///     let sleep = time::sleep(Duration::from_millis(10));
163     ///     tokio::pin!(sleep);
164     ///
165     ///     loop {
166     ///         tokio::select! {
167     ///             () = &mut sleep => {
168     ///                 println!("timer elapsed");
169     ///                 sleep.as_mut().reset(Instant::now() + Duration::from_millis(50));
170     ///             },
171     ///         }
172     ///     }
173     /// }
174     /// ```
175     /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the
176     /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not.
177     /// ```
178     /// use std::future::Future;
179     /// use std::pin::Pin;
180     /// use std::task::{Context, Poll};
181     /// use tokio::time::Sleep;
182     ///
183     /// struct HasSleep {
184     ///     sleep: Pin<Box<Sleep>>,
185     /// }
186     ///
187     /// impl Future for HasSleep {
188     ///     type Output = ();
189     ///
190     ///     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
191     ///         self.sleep.as_mut().poll(cx)
192     ///     }
193     /// }
194     /// ```
195     /// Use in a struct with pin projection. This method avoids the `Box`, but
196     /// the `HasSleep` struct will not be `Unpin` as a consequence.
197     /// ```
198     /// use std::future::Future;
199     /// use std::pin::Pin;
200     /// use std::task::{Context, Poll};
201     /// use tokio::time::Sleep;
202     /// use pin_project_lite::pin_project;
203     ///
204     /// pin_project! {
205     ///     struct HasSleep {
206     ///         #[pin]
207     ///         sleep: Sleep,
208     ///     }
209     /// }
210     ///
211     /// impl Future for HasSleep {
212     ///     type Output = ();
213     ///
214     ///     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
215     ///         self.project().sleep.poll(cx)
216     ///     }
217     /// }
218     /// ```
219     ///
220     /// [`select!`]: ../macro.select.html
221     /// [`tokio::pin!`]: ../macro.pin.html
222     #[project(!Unpin)]
223     // Alias for old name in 0.2
224     #[cfg_attr(docsrs, doc(alias = "Delay"))]
225     #[derive(Debug)]
226     #[must_use = "futures do nothing unless you `.await` or poll them"]
227     pub struct Sleep {
228         inner: Inner,
229 
230         // The link between the `Sleep` instance and the timer that drives it.
231         #[pin]
232         entry: TimerEntry,
233     }
234 }
235 
236 cfg_trace! {
237     #[derive(Debug)]
238     struct Inner {
239         ctx: trace::AsyncOpTracingCtx,
240     }
241 }
242 
243 cfg_not_trace! {
244     #[derive(Debug)]
245     struct Inner {
246     }
247 }
248 
249 impl Sleep {
250     #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
251     #[track_caller]
new_timeout( deadline: Instant, location: Option<&'static Location<'static>>, ) -> Sleep252     pub(crate) fn new_timeout(
253         deadline: Instant,
254         location: Option<&'static Location<'static>>,
255     ) -> Sleep {
256         use crate::runtime::scheduler;
257         let handle = scheduler::Handle::current();
258         let entry = TimerEntry::new(handle, deadline);
259         #[cfg(all(tokio_unstable, feature = "tracing"))]
260         let inner = {
261             let handle = scheduler::Handle::current();
262             let clock = handle.driver().clock();
263             let handle = &handle.driver().time();
264             let time_source = handle.time_source();
265             let deadline_tick = time_source.deadline_to_tick(deadline);
266             let duration = deadline_tick.saturating_sub(time_source.now(clock));
267 
268             let location = location.expect("should have location if tracing");
269             let resource_span = tracing::trace_span!(
270                 parent: None,
271                 "runtime.resource",
272                 concrete_type = "Sleep",
273                 kind = "timer",
274                 loc.file = location.file(),
275                 loc.line = location.line(),
276                 loc.col = location.column(),
277             );
278 
279             let async_op_span = resource_span.in_scope(|| {
280                 tracing::trace!(
281                     target: "runtime::resource::state_update",
282                     duration = duration,
283                     duration.unit = "ms",
284                     duration.op = "override",
285                 );
286 
287                 tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout")
288             });
289 
290             let async_op_poll_span =
291                 async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
292 
293             let ctx = trace::AsyncOpTracingCtx {
294                 async_op_span,
295                 async_op_poll_span,
296                 resource_span,
297             };
298 
299             Inner { ctx }
300         };
301 
302         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
303         let inner = Inner {};
304 
305         Sleep { inner, entry }
306     }
307 
far_future(location: Option<&'static Location<'static>>) -> Sleep308     pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {
309         Self::new_timeout(Instant::far_future(), location)
310     }
311 
312     /// Returns the instant at which the future will complete.
deadline(&self) -> Instant313     pub fn deadline(&self) -> Instant {
314         self.entry.deadline()
315     }
316 
317     /// Returns `true` if `Sleep` has elapsed.
318     ///
319     /// A `Sleep` instance is elapsed when the requested duration has elapsed.
is_elapsed(&self) -> bool320     pub fn is_elapsed(&self) -> bool {
321         self.entry.is_elapsed()
322     }
323 
324     /// Resets the `Sleep` instance to a new deadline.
325     ///
326     /// Calling this function allows changing the instant at which the `Sleep`
327     /// future completes without having to create new associated state.
328     ///
329     /// This function can be called both before and after the future has
330     /// completed.
331     ///
332     /// To call this method, you will usually combine the call with
333     /// [`Pin::as_mut`], which lets you call the method without consuming the
334     /// `Sleep` itself.
335     ///
336     /// # Example
337     ///
338     /// ```
339     /// use tokio::time::{Duration, Instant};
340     ///
341     /// # #[tokio::main(flavor = "current_thread")]
342     /// # async fn main() {
343     /// let sleep = tokio::time::sleep(Duration::from_millis(10));
344     /// tokio::pin!(sleep);
345     ///
346     /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20));
347     /// # }
348     /// ```
349     ///
350     /// See also the top-level examples.
351     ///
352     /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
reset(self: Pin<&mut Self>, deadline: Instant)353     pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
354         self.reset_inner(deadline);
355     }
356 
357     /// Resets the `Sleep` instance to a new deadline without reregistering it
358     /// to be woken up.
359     ///
360     /// Calling this function allows changing the instant at which the `Sleep`
361     /// future completes without having to create new associated state and
362     /// without having it registered. This is required in e.g. the
363     /// [`crate::time::Interval`] where we want to reset the internal [Sleep]
364     /// without having it wake up the last task that polled it.
reset_without_reregister(self: Pin<&mut Self>, deadline: Instant)365     pub(crate) fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) {
366         let mut me = self.project();
367         me.entry.as_mut().reset(deadline, false);
368     }
369 
reset_inner(self: Pin<&mut Self>, deadline: Instant)370     fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
371         let mut me = self.project();
372         me.entry.as_mut().reset(deadline, true);
373 
374         #[cfg(all(tokio_unstable, feature = "tracing"))]
375         {
376             let _resource_enter = me.inner.ctx.resource_span.enter();
377             me.inner.ctx.async_op_span =
378                 tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset");
379             let _async_op_enter = me.inner.ctx.async_op_span.enter();
380 
381             me.inner.ctx.async_op_poll_span =
382                 tracing::trace_span!("runtime.resource.async_op.poll");
383 
384             let duration = {
385                 let clock = me.entry.clock();
386                 let time_source = me.entry.driver().time_source();
387                 let now = time_source.now(clock);
388                 let deadline_tick = time_source.deadline_to_tick(deadline);
389                 deadline_tick.saturating_sub(now)
390             };
391 
392             tracing::trace!(
393                 target: "runtime::resource::state_update",
394                 duration = duration,
395                 duration.unit = "ms",
396                 duration.op = "override",
397             );
398         }
399     }
400 
poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>>401     fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
402         let me = self.project();
403 
404         ready!(crate::trace::trace_leaf(cx));
405 
406         // Keep track of task budget
407         #[cfg(all(tokio_unstable, feature = "tracing"))]
408         let coop = ready!(trace_poll_op!(
409             "poll_elapsed",
410             crate::runtime::coop::poll_proceed(cx),
411         ));
412 
413         #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
414         let coop = ready!(crate::runtime::coop::poll_proceed(cx));
415 
416         let result = me.entry.poll_elapsed(cx).map(move |r| {
417             coop.made_progress();
418             r
419         });
420 
421         #[cfg(all(tokio_unstable, feature = "tracing"))]
422         return trace_poll_op!("poll_elapsed", result);
423 
424         #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
425         return result;
426     }
427 }
428 
429 impl Future for Sleep {
430     type Output = ();
431 
432     // `poll_elapsed` can return an error in two cases:
433     //
434     // - AtCapacity: this is a pathological case where far too many
435     //   sleep instances have been scheduled.
436     // - Shutdown: No timer has been setup, which is a mis-use error.
437     //
438     // Both cases are extremely rare, and pretty accurately fit into
439     // "logic errors", so we just panic in this case. A user couldn't
440     // really do much better if we passed the error onwards.
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>441     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
442         #[cfg(all(tokio_unstable, feature = "tracing"))]
443         let _res_span = self.inner.ctx.resource_span.clone().entered();
444         #[cfg(all(tokio_unstable, feature = "tracing"))]
445         let _ao_span = self.inner.ctx.async_op_span.clone().entered();
446         #[cfg(all(tokio_unstable, feature = "tracing"))]
447         let _ao_poll_span = self.inner.ctx.async_op_poll_span.clone().entered();
448         match ready!(self.as_mut().poll_elapsed(cx)) {
449             Ok(()) => Poll::Ready(()),
450             Err(e) => panic!("timer error: {e}"),
451         }
452     }
453 }
454