1 use super::BOX_FUTURE_THRESHOLD;
2 use crate::runtime::blocking::BlockingPool;
3 use crate::runtime::scheduler::CurrentThread;
4 use crate::runtime::{context, EnterGuard, Handle};
5 use crate::task::JoinHandle;
6 use crate::util::trace::SpawnMeta;
7 
8 use std::future::Future;
9 use std::mem;
10 use std::time::Duration;
11 
12 cfg_rt_multi_thread! {
13     use crate::runtime::Builder;
14     use crate::runtime::scheduler::MultiThread;
15 
16     cfg_unstable! {
17         use crate::runtime::scheduler::MultiThreadAlt;
18     }
19 }
20 
21 /// The Tokio runtime.
22 ///
23 /// The runtime provides an I/O driver, task scheduler, [timer], and
24 /// blocking pool, necessary for running asynchronous tasks.
25 ///
26 /// Instances of `Runtime` can be created using [`new`], or [`Builder`].
27 /// However, most users will use the [`#[tokio::main]`][main] annotation on
28 /// their entry point instead.
29 ///
30 /// See [module level][mod] documentation for more details.
31 ///
32 /// # Shutdown
33 ///
34 /// Shutting down the runtime is done by dropping the value, or calling
35 /// [`shutdown_background`] or [`shutdown_timeout`].
36 ///
37 /// Tasks spawned through [`Runtime::spawn`] keep running until they yield.
38 /// Then they are dropped. They are not *guaranteed* to run to completion, but
39 /// *might* do so if they do not yield until completion.
40 ///
41 /// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running
42 /// until they return.
43 ///
44 /// The thread initiating the shutdown blocks until all spawned work has been
45 /// stopped. This can take an indefinite amount of time. The `Drop`
46 /// implementation waits forever for this.
47 ///
48 /// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if
49 /// waiting forever is undesired. When the timeout is reached, spawned work that
50 /// did not stop in time and threads running it are leaked. The work continues
51 /// to run until one of the stopping conditions is fulfilled, but the thread
52 /// initiating the shutdown is unblocked.
53 ///
54 /// Once the runtime has been dropped, any outstanding I/O resources bound to
55 /// it will no longer function. Calling any method on them will result in an
56 /// error.
57 ///
58 /// # Sharing
59 ///
60 /// There are several ways to establish shared access to a Tokio runtime:
61 ///
62 ///  * Using an <code>[Arc]\<Runtime></code>.
63 ///  * Using a [`Handle`].
64 ///  * Entering the runtime context.
65 ///
66 /// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various
67 /// things with the runtime such as spawning new tasks or entering the runtime
68 /// context. Both types can be cloned to create a new handle that allows access
69 /// to the same runtime. By passing clones into different tasks or threads, you
70 /// will be able to access the runtime from those tasks or threads.
71 ///
72 /// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that
73 /// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down,
74 /// whereas a [`Handle`] does not prevent that. This is because shutdown of the
75 /// runtime happens when the destructor of the `Runtime` object runs.
76 ///
77 /// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive
78 /// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>,
79 /// this can be achieved via [`Arc::try_unwrap`] when only one strong count
80 /// reference is left over.
81 ///
82 /// The runtime context is entered using the [`Runtime::enter`] or
83 /// [`Handle::enter`] methods, which use a thread-local variable to store the
84 /// current runtime. Whenever you are inside the runtime context, methods such
85 /// as [`tokio::spawn`] will use the runtime whose context you are inside.
86 ///
87 /// [timer]: crate::time
88 /// [mod]: index.html
89 /// [`new`]: method@Self::new
90 /// [`Builder`]: struct@Builder
91 /// [`Handle`]: struct@Handle
92 /// [main]: macro@crate::main
93 /// [`tokio::spawn`]: crate::spawn
94 /// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap
95 /// [Arc]: std::sync::Arc
96 /// [`shutdown_background`]: method@Runtime::shutdown_background
97 /// [`shutdown_timeout`]: method@Runtime::shutdown_timeout
98 #[derive(Debug)]
99 pub struct Runtime {
100     /// Task scheduler
101     scheduler: Scheduler,
102 
103     /// Handle to runtime, also contains driver handles
104     handle: Handle,
105 
106     /// Blocking pool handle, used to signal shutdown
107     blocking_pool: BlockingPool,
108 }
109 
110 /// The flavor of a `Runtime`.
111 ///
112 /// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()).
113 #[derive(Debug, PartialEq, Eq)]
114 #[non_exhaustive]
115 pub enum RuntimeFlavor {
116     /// The flavor that executes all tasks on the current thread.
117     CurrentThread,
118     /// The flavor that executes tasks across multiple threads.
119     MultiThread,
120     /// The flavor that executes tasks across multiple threads.
121     #[cfg(tokio_unstable)]
122     #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
123     MultiThreadAlt,
124 }
125 
126 /// The runtime scheduler is either a multi-thread or a current-thread executor.
127 #[derive(Debug)]
128 pub(super) enum Scheduler {
129     /// Execute all tasks on the current-thread.
130     CurrentThread(CurrentThread),
131 
132     /// Execute tasks across multiple threads.
133     #[cfg(feature = "rt-multi-thread")]
134     MultiThread(MultiThread),
135 
136     /// Execute tasks across multiple threads.
137     #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
138     MultiThreadAlt(MultiThreadAlt),
139 }
140 
141 impl Runtime {
from_parts( scheduler: Scheduler, handle: Handle, blocking_pool: BlockingPool, ) -> Runtime142     pub(super) fn from_parts(
143         scheduler: Scheduler,
144         handle: Handle,
145         blocking_pool: BlockingPool,
146     ) -> Runtime {
147         Runtime {
148             scheduler,
149             handle,
150             blocking_pool,
151         }
152     }
153 
154     /// Creates a new runtime instance with default configuration values.
155     ///
156     /// This results in the multi threaded scheduler, I/O driver, and time driver being
157     /// initialized.
158     ///
159     /// Most applications will not need to call this function directly. Instead,
160     /// they will use the  [`#[tokio::main]` attribute][main]. When a more complex
161     /// configuration is necessary, the [runtime builder] may be used.
162     ///
163     /// See [module level][mod] documentation for more details.
164     ///
165     /// # Examples
166     ///
167     /// Creating a new `Runtime` with default configuration values.
168     ///
169     /// ```
170     /// use tokio::runtime::Runtime;
171     ///
172     /// let rt = Runtime::new()
173     ///     .unwrap();
174     ///
175     /// // Use the runtime...
176     /// ```
177     ///
178     /// [mod]: index.html
179     /// [main]: ../attr.main.html
180     /// [threaded scheduler]: index.html#threaded-scheduler
181     /// [runtime builder]: crate::runtime::Builder
182     #[cfg(feature = "rt-multi-thread")]
183     #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
new() -> std::io::Result<Runtime>184     pub fn new() -> std::io::Result<Runtime> {
185         Builder::new_multi_thread().enable_all().build()
186     }
187 
188     /// Returns a handle to the runtime's spawner.
189     ///
190     /// The returned handle can be used to spawn tasks that run on this runtime, and can
191     /// be cloned to allow moving the `Handle` to other threads.
192     ///
193     /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone.
194     /// Refer to the documentation of [`Handle::block_on`] for more.
195     ///
196     /// # Examples
197     ///
198     /// ```
199     /// use tokio::runtime::Runtime;
200     ///
201     /// let rt = Runtime::new()
202     ///     .unwrap();
203     ///
204     /// let handle = rt.handle();
205     ///
206     /// // Use the handle...
207     /// ```
handle(&self) -> &Handle208     pub fn handle(&self) -> &Handle {
209         &self.handle
210     }
211 
212     /// Spawns a future onto the Tokio runtime.
213     ///
214     /// This spawns the given future onto the runtime's executor, usually a
215     /// thread pool. The thread pool is then responsible for polling the future
216     /// until it completes.
217     ///
218     /// The provided future will start running in the background immediately
219     /// when `spawn` is called, even if you don't await the returned
220     /// `JoinHandle`.
221     ///
222     /// See [module level][mod] documentation for more details.
223     ///
224     /// [mod]: index.html
225     ///
226     /// # Examples
227     ///
228     /// ```
229     /// use tokio::runtime::Runtime;
230     ///
231     /// # fn dox() {
232     /// // Create the runtime
233     /// let rt = Runtime::new().unwrap();
234     ///
235     /// // Spawn a future onto the runtime
236     /// rt.spawn(async {
237     ///     println!("now running on a worker thread");
238     /// });
239     /// # }
240     /// ```
241     #[track_caller]
spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,242     pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
243     where
244         F: Future + Send + 'static,
245         F::Output: Send + 'static,
246     {
247         let fut_size = mem::size_of::<F>();
248         if fut_size > BOX_FUTURE_THRESHOLD {
249             self.handle
250                 .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
251         } else {
252             self.handle
253                 .spawn_named(future, SpawnMeta::new_unnamed(fut_size))
254         }
255     }
256 
257     /// Runs the provided function on an executor dedicated to blocking operations.
258     ///
259     /// # Examples
260     ///
261     /// ```
262     /// use tokio::runtime::Runtime;
263     ///
264     /// # fn dox() {
265     /// // Create the runtime
266     /// let rt = Runtime::new().unwrap();
267     ///
268     /// // Spawn a blocking function onto the runtime
269     /// rt.spawn_blocking(|| {
270     ///     println!("now running on a worker thread");
271     /// });
272     /// # }
273     /// ```
274     #[track_caller]
spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,275     pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
276     where
277         F: FnOnce() -> R + Send + 'static,
278         R: Send + 'static,
279     {
280         self.handle.spawn_blocking(func)
281     }
282 
283     /// Runs a future to completion on the Tokio runtime. This is the
284     /// runtime's entry point.
285     ///
286     /// This runs the given future on the current thread, blocking until it is
287     /// complete, and yielding its resolved result. Any tasks or timers
288     /// which the future spawns internally will be executed on the runtime.
289     ///
290     /// # Non-worker future
291     ///
292     /// Note that the future required by this function does not run as a
293     /// worker. The expectation is that other tasks are spawned by the future here.
294     /// Awaiting on other futures from the future provided here will not
295     /// perform as fast as those spawned as workers.
296     ///
297     /// # Multi thread scheduler
298     ///
299     /// When the multi thread scheduler is used this will allow futures
300     /// to run within the io driver and timer context of the overall runtime.
301     ///
302     /// Any spawned tasks will continue running after `block_on` returns.
303     ///
304     /// # Current thread scheduler
305     ///
306     /// When the current thread scheduler is enabled `block_on`
307     /// can be called concurrently from multiple threads. The first call
308     /// will take ownership of the io and timer drivers. This means
309     /// other threads which do not own the drivers will hook into that one.
310     /// When the first `block_on` completes, other threads will be able to
311     /// "steal" the driver to allow continued execution of their futures.
312     ///
313     /// Any spawned tasks will be suspended after `block_on` returns. Calling
314     /// `block_on` again will resume previously spawned tasks.
315     ///
316     /// # Panics
317     ///
318     /// This function panics if the provided future panics, or if called within an
319     /// asynchronous execution context.
320     ///
321     /// # Examples
322     ///
323     /// ```no_run
324     /// use tokio::runtime::Runtime;
325     ///
326     /// // Create the runtime
327     /// let rt  = Runtime::new().unwrap();
328     ///
329     /// // Execute the future, blocking the current thread until completion
330     /// rt.block_on(async {
331     ///     println!("hello");
332     /// });
333     /// ```
334     ///
335     /// [handle]: fn@Handle::block_on
336     #[track_caller]
block_on<F: Future>(&self, future: F) -> F::Output337     pub fn block_on<F: Future>(&self, future: F) -> F::Output {
338         let fut_size = mem::size_of::<F>();
339         if fut_size > BOX_FUTURE_THRESHOLD {
340             self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
341         } else {
342             self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
343         }
344     }
345 
346     #[track_caller]
block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output347     fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
348         #[cfg(all(
349             tokio_unstable,
350             tokio_taskdump,
351             feature = "rt",
352             target_os = "linux",
353             any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
354         ))]
355         let future = super::task::trace::Trace::root(future);
356 
357         #[cfg(all(tokio_unstable, feature = "tracing"))]
358         let future = crate::util::trace::task(
359             future,
360             "block_on",
361             _meta,
362             crate::runtime::task::Id::next().as_u64(),
363         );
364 
365         let _enter = self.enter();
366 
367         match &self.scheduler {
368             Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
369             #[cfg(feature = "rt-multi-thread")]
370             Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
371             #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
372             Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future),
373         }
374     }
375 
376     /// Enters the runtime context.
377     ///
378     /// This allows you to construct types that must have an executor
379     /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
380     /// also allow you to call methods such as [`tokio::spawn`].
381     ///
382     /// [`Sleep`]: struct@crate::time::Sleep
383     /// [`TcpStream`]: struct@crate::net::TcpStream
384     /// [`tokio::spawn`]: fn@crate::spawn
385     ///
386     /// # Example
387     ///
388     /// ```
389     /// use tokio::runtime::Runtime;
390     /// use tokio::task::JoinHandle;
391     ///
392     /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
393     ///     // Had we not used `rt.enter` below, this would panic.
394     ///     tokio::spawn(async move {
395     ///         println!("{}", msg);
396     ///     })
397     /// }
398     ///
399     /// fn main() {
400     ///     let rt = Runtime::new().unwrap();
401     ///
402     ///     let s = "Hello World!".to_string();
403     ///
404     ///     // By entering the context, we tie `tokio::spawn` to this executor.
405     ///     let _guard = rt.enter();
406     ///     let handle = function_that_spawns(s);
407     ///
408     ///     // Wait for the task before we end the test.
409     ///     rt.block_on(handle).unwrap();
410     /// }
411     /// ```
enter(&self) -> EnterGuard<'_>412     pub fn enter(&self) -> EnterGuard<'_> {
413         self.handle.enter()
414     }
415 
416     /// Shuts down the runtime, waiting for at most `duration` for all spawned
417     /// work to stop.
418     ///
419     /// See the [struct level documentation](Runtime#shutdown) for more details.
420     ///
421     /// # Examples
422     ///
423     /// ```
424     /// use tokio::runtime::Runtime;
425     /// use tokio::task;
426     ///
427     /// use std::thread;
428     /// use std::time::Duration;
429     ///
430     /// fn main() {
431     ///    let runtime = Runtime::new().unwrap();
432     ///
433     ///    runtime.block_on(async move {
434     ///        task::spawn_blocking(move || {
435     ///            thread::sleep(Duration::from_secs(10_000));
436     ///        });
437     ///    });
438     ///
439     ///    runtime.shutdown_timeout(Duration::from_millis(100));
440     /// }
441     /// ```
shutdown_timeout(mut self, duration: Duration)442     pub fn shutdown_timeout(mut self, duration: Duration) {
443         // Wakeup and shutdown all the worker threads
444         self.handle.inner.shutdown();
445         self.blocking_pool.shutdown(Some(duration));
446     }
447 
448     /// Shuts down the runtime, without waiting for any spawned work to stop.
449     ///
450     /// This can be useful if you want to drop a runtime from within another runtime.
451     /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
452     /// to complete, which would normally not be permitted within an asynchronous context.
453     /// By calling `shutdown_background()`, you can drop the runtime from such a context.
454     ///
455     /// Note however, that because we do not wait for any blocking tasks to complete, this
456     /// may result in a resource leak (in that any blocking tasks are still running until they
457     /// return.
458     ///
459     /// See the [struct level documentation](Runtime#shutdown) for more details.
460     ///
461     /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
462     ///
463     /// ```
464     /// use tokio::runtime::Runtime;
465     ///
466     /// fn main() {
467     ///    let runtime = Runtime::new().unwrap();
468     ///
469     ///    runtime.block_on(async move {
470     ///        let inner_runtime = Runtime::new().unwrap();
471     ///        // ...
472     ///        inner_runtime.shutdown_background();
473     ///    });
474     /// }
475     /// ```
shutdown_background(self)476     pub fn shutdown_background(self) {
477         self.shutdown_timeout(Duration::from_nanos(0));
478     }
479 
480     /// Returns a view that lets you get information about how the runtime
481     /// is performing.
metrics(&self) -> crate::runtime::RuntimeMetrics482     pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
483         self.handle.metrics()
484     }
485 }
486 
487 #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let
488 impl Drop for Runtime {
drop(&mut self)489     fn drop(&mut self) {
490         match &mut self.scheduler {
491             Scheduler::CurrentThread(current_thread) => {
492                 // This ensures that tasks spawned on the current-thread
493                 // runtime are dropped inside the runtime's context.
494                 let _guard = context::try_set_current(&self.handle.inner);
495                 current_thread.shutdown(&self.handle.inner);
496             }
497             #[cfg(feature = "rt-multi-thread")]
498             Scheduler::MultiThread(multi_thread) => {
499                 // The threaded scheduler drops its tasks on its worker threads, which is
500                 // already in the runtime's context.
501                 multi_thread.shutdown(&self.handle.inner);
502             }
503             #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
504             Scheduler::MultiThreadAlt(multi_thread) => {
505                 // The threaded scheduler drops its tasks on its worker threads, which is
506                 // already in the runtime's context.
507                 multi_thread.shutdown(&self.handle.inner);
508             }
509         }
510     }
511 }
512 
513 impl std::panic::UnwindSafe for Runtime {}
514 
515 impl std::panic::RefUnwindSafe for Runtime {}
516