1 use super::BOX_FUTURE_THRESHOLD; 2 use crate::runtime::blocking::BlockingPool; 3 use crate::runtime::scheduler::CurrentThread; 4 use crate::runtime::{context, EnterGuard, Handle}; 5 use crate::task::JoinHandle; 6 use crate::util::trace::SpawnMeta; 7 8 use std::future::Future; 9 use std::mem; 10 use std::time::Duration; 11 12 cfg_rt_multi_thread! { 13 use crate::runtime::Builder; 14 use crate::runtime::scheduler::MultiThread; 15 16 cfg_unstable! { 17 use crate::runtime::scheduler::MultiThreadAlt; 18 } 19 } 20 21 /// The Tokio runtime. 22 /// 23 /// The runtime provides an I/O driver, task scheduler, [timer], and 24 /// blocking pool, necessary for running asynchronous tasks. 25 /// 26 /// Instances of `Runtime` can be created using [`new`], or [`Builder`]. 27 /// However, most users will use the [`#[tokio::main]`][main] annotation on 28 /// their entry point instead. 29 /// 30 /// See [module level][mod] documentation for more details. 31 /// 32 /// # Shutdown 33 /// 34 /// Shutting down the runtime is done by dropping the value, or calling 35 /// [`shutdown_background`] or [`shutdown_timeout`]. 36 /// 37 /// Tasks spawned through [`Runtime::spawn`] keep running until they yield. 38 /// Then they are dropped. They are not *guaranteed* to run to completion, but 39 /// *might* do so if they do not yield until completion. 40 /// 41 /// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running 42 /// until they return. 43 /// 44 /// The thread initiating the shutdown blocks until all spawned work has been 45 /// stopped. This can take an indefinite amount of time. The `Drop` 46 /// implementation waits forever for this. 47 /// 48 /// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if 49 /// waiting forever is undesired. When the timeout is reached, spawned work that 50 /// did not stop in time and threads running it are leaked. The work continues 51 /// to run until one of the stopping conditions is fulfilled, but the thread 52 /// initiating the shutdown is unblocked. 53 /// 54 /// Once the runtime has been dropped, any outstanding I/O resources bound to 55 /// it will no longer function. Calling any method on them will result in an 56 /// error. 57 /// 58 /// # Sharing 59 /// 60 /// There are several ways to establish shared access to a Tokio runtime: 61 /// 62 /// * Using an <code>[Arc]\<Runtime></code>. 63 /// * Using a [`Handle`]. 64 /// * Entering the runtime context. 65 /// 66 /// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various 67 /// things with the runtime such as spawning new tasks or entering the runtime 68 /// context. Both types can be cloned to create a new handle that allows access 69 /// to the same runtime. By passing clones into different tasks or threads, you 70 /// will be able to access the runtime from those tasks or threads. 71 /// 72 /// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that 73 /// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down, 74 /// whereas a [`Handle`] does not prevent that. This is because shutdown of the 75 /// runtime happens when the destructor of the `Runtime` object runs. 76 /// 77 /// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive 78 /// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>, 79 /// this can be achieved via [`Arc::try_unwrap`] when only one strong count 80 /// reference is left over. 81 /// 82 /// The runtime context is entered using the [`Runtime::enter`] or 83 /// [`Handle::enter`] methods, which use a thread-local variable to store the 84 /// current runtime. Whenever you are inside the runtime context, methods such 85 /// as [`tokio::spawn`] will use the runtime whose context you are inside. 86 /// 87 /// [timer]: crate::time 88 /// [mod]: index.html 89 /// [`new`]: method@Self::new 90 /// [`Builder`]: struct@Builder 91 /// [`Handle`]: struct@Handle 92 /// [main]: macro@crate::main 93 /// [`tokio::spawn`]: crate::spawn 94 /// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap 95 /// [Arc]: std::sync::Arc 96 /// [`shutdown_background`]: method@Runtime::shutdown_background 97 /// [`shutdown_timeout`]: method@Runtime::shutdown_timeout 98 #[derive(Debug)] 99 pub struct Runtime { 100 /// Task scheduler 101 scheduler: Scheduler, 102 103 /// Handle to runtime, also contains driver handles 104 handle: Handle, 105 106 /// Blocking pool handle, used to signal shutdown 107 blocking_pool: BlockingPool, 108 } 109 110 /// The flavor of a `Runtime`. 111 /// 112 /// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()). 113 #[derive(Debug, PartialEq, Eq)] 114 #[non_exhaustive] 115 pub enum RuntimeFlavor { 116 /// The flavor that executes all tasks on the current thread. 117 CurrentThread, 118 /// The flavor that executes tasks across multiple threads. 119 MultiThread, 120 /// The flavor that executes tasks across multiple threads. 121 #[cfg(tokio_unstable)] 122 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] 123 MultiThreadAlt, 124 } 125 126 /// The runtime scheduler is either a multi-thread or a current-thread executor. 127 #[derive(Debug)] 128 pub(super) enum Scheduler { 129 /// Execute all tasks on the current-thread. 130 CurrentThread(CurrentThread), 131 132 /// Execute tasks across multiple threads. 133 #[cfg(feature = "rt-multi-thread")] 134 MultiThread(MultiThread), 135 136 /// Execute tasks across multiple threads. 137 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 138 MultiThreadAlt(MultiThreadAlt), 139 } 140 141 impl Runtime { from_parts( scheduler: Scheduler, handle: Handle, blocking_pool: BlockingPool, ) -> Runtime142 pub(super) fn from_parts( 143 scheduler: Scheduler, 144 handle: Handle, 145 blocking_pool: BlockingPool, 146 ) -> Runtime { 147 Runtime { 148 scheduler, 149 handle, 150 blocking_pool, 151 } 152 } 153 154 /// Creates a new runtime instance with default configuration values. 155 /// 156 /// This results in the multi threaded scheduler, I/O driver, and time driver being 157 /// initialized. 158 /// 159 /// Most applications will not need to call this function directly. Instead, 160 /// they will use the [`#[tokio::main]` attribute][main]. When a more complex 161 /// configuration is necessary, the [runtime builder] may be used. 162 /// 163 /// See [module level][mod] documentation for more details. 164 /// 165 /// # Examples 166 /// 167 /// Creating a new `Runtime` with default configuration values. 168 /// 169 /// ``` 170 /// use tokio::runtime::Runtime; 171 /// 172 /// let rt = Runtime::new() 173 /// .unwrap(); 174 /// 175 /// // Use the runtime... 176 /// ``` 177 /// 178 /// [mod]: index.html 179 /// [main]: ../attr.main.html 180 /// [threaded scheduler]: index.html#threaded-scheduler 181 /// [runtime builder]: crate::runtime::Builder 182 #[cfg(feature = "rt-multi-thread")] 183 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] new() -> std::io::Result<Runtime>184 pub fn new() -> std::io::Result<Runtime> { 185 Builder::new_multi_thread().enable_all().build() 186 } 187 188 /// Returns a handle to the runtime's spawner. 189 /// 190 /// The returned handle can be used to spawn tasks that run on this runtime, and can 191 /// be cloned to allow moving the `Handle` to other threads. 192 /// 193 /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone. 194 /// Refer to the documentation of [`Handle::block_on`] for more. 195 /// 196 /// # Examples 197 /// 198 /// ``` 199 /// use tokio::runtime::Runtime; 200 /// 201 /// let rt = Runtime::new() 202 /// .unwrap(); 203 /// 204 /// let handle = rt.handle(); 205 /// 206 /// // Use the handle... 207 /// ``` handle(&self) -> &Handle208 pub fn handle(&self) -> &Handle { 209 &self.handle 210 } 211 212 /// Spawns a future onto the Tokio runtime. 213 /// 214 /// This spawns the given future onto the runtime's executor, usually a 215 /// thread pool. The thread pool is then responsible for polling the future 216 /// until it completes. 217 /// 218 /// The provided future will start running in the background immediately 219 /// when `spawn` is called, even if you don't await the returned 220 /// `JoinHandle`. 221 /// 222 /// See [module level][mod] documentation for more details. 223 /// 224 /// [mod]: index.html 225 /// 226 /// # Examples 227 /// 228 /// ``` 229 /// use tokio::runtime::Runtime; 230 /// 231 /// # fn dox() { 232 /// // Create the runtime 233 /// let rt = Runtime::new().unwrap(); 234 /// 235 /// // Spawn a future onto the runtime 236 /// rt.spawn(async { 237 /// println!("now running on a worker thread"); 238 /// }); 239 /// # } 240 /// ``` 241 #[track_caller] spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,242 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> 243 where 244 F: Future + Send + 'static, 245 F::Output: Send + 'static, 246 { 247 let fut_size = mem::size_of::<F>(); 248 if fut_size > BOX_FUTURE_THRESHOLD { 249 self.handle 250 .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) 251 } else { 252 self.handle 253 .spawn_named(future, SpawnMeta::new_unnamed(fut_size)) 254 } 255 } 256 257 /// Runs the provided function on an executor dedicated to blocking operations. 258 /// 259 /// # Examples 260 /// 261 /// ``` 262 /// use tokio::runtime::Runtime; 263 /// 264 /// # fn dox() { 265 /// // Create the runtime 266 /// let rt = Runtime::new().unwrap(); 267 /// 268 /// // Spawn a blocking function onto the runtime 269 /// rt.spawn_blocking(|| { 270 /// println!("now running on a worker thread"); 271 /// }); 272 /// # } 273 /// ``` 274 #[track_caller] spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,275 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> 276 where 277 F: FnOnce() -> R + Send + 'static, 278 R: Send + 'static, 279 { 280 self.handle.spawn_blocking(func) 281 } 282 283 /// Runs a future to completion on the Tokio runtime. This is the 284 /// runtime's entry point. 285 /// 286 /// This runs the given future on the current thread, blocking until it is 287 /// complete, and yielding its resolved result. Any tasks or timers 288 /// which the future spawns internally will be executed on the runtime. 289 /// 290 /// # Non-worker future 291 /// 292 /// Note that the future required by this function does not run as a 293 /// worker. The expectation is that other tasks are spawned by the future here. 294 /// Awaiting on other futures from the future provided here will not 295 /// perform as fast as those spawned as workers. 296 /// 297 /// # Multi thread scheduler 298 /// 299 /// When the multi thread scheduler is used this will allow futures 300 /// to run within the io driver and timer context of the overall runtime. 301 /// 302 /// Any spawned tasks will continue running after `block_on` returns. 303 /// 304 /// # Current thread scheduler 305 /// 306 /// When the current thread scheduler is enabled `block_on` 307 /// can be called concurrently from multiple threads. The first call 308 /// will take ownership of the io and timer drivers. This means 309 /// other threads which do not own the drivers will hook into that one. 310 /// When the first `block_on` completes, other threads will be able to 311 /// "steal" the driver to allow continued execution of their futures. 312 /// 313 /// Any spawned tasks will be suspended after `block_on` returns. Calling 314 /// `block_on` again will resume previously spawned tasks. 315 /// 316 /// # Panics 317 /// 318 /// This function panics if the provided future panics, or if called within an 319 /// asynchronous execution context. 320 /// 321 /// # Examples 322 /// 323 /// ```no_run 324 /// use tokio::runtime::Runtime; 325 /// 326 /// // Create the runtime 327 /// let rt = Runtime::new().unwrap(); 328 /// 329 /// // Execute the future, blocking the current thread until completion 330 /// rt.block_on(async { 331 /// println!("hello"); 332 /// }); 333 /// ``` 334 /// 335 /// [handle]: fn@Handle::block_on 336 #[track_caller] block_on<F: Future>(&self, future: F) -> F::Output337 pub fn block_on<F: Future>(&self, future: F) -> F::Output { 338 let fut_size = mem::size_of::<F>(); 339 if fut_size > BOX_FUTURE_THRESHOLD { 340 self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) 341 } else { 342 self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size)) 343 } 344 } 345 346 #[track_caller] block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output347 fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output { 348 #[cfg(all( 349 tokio_unstable, 350 tokio_taskdump, 351 feature = "rt", 352 target_os = "linux", 353 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") 354 ))] 355 let future = super::task::trace::Trace::root(future); 356 357 #[cfg(all(tokio_unstable, feature = "tracing"))] 358 let future = crate::util::trace::task( 359 future, 360 "block_on", 361 _meta, 362 crate::runtime::task::Id::next().as_u64(), 363 ); 364 365 let _enter = self.enter(); 366 367 match &self.scheduler { 368 Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future), 369 #[cfg(feature = "rt-multi-thread")] 370 Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future), 371 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 372 Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future), 373 } 374 } 375 376 /// Enters the runtime context. 377 /// 378 /// This allows you to construct types that must have an executor 379 /// available on creation such as [`Sleep`] or [`TcpStream`]. It will 380 /// also allow you to call methods such as [`tokio::spawn`]. 381 /// 382 /// [`Sleep`]: struct@crate::time::Sleep 383 /// [`TcpStream`]: struct@crate::net::TcpStream 384 /// [`tokio::spawn`]: fn@crate::spawn 385 /// 386 /// # Example 387 /// 388 /// ``` 389 /// use tokio::runtime::Runtime; 390 /// use tokio::task::JoinHandle; 391 /// 392 /// fn function_that_spawns(msg: String) -> JoinHandle<()> { 393 /// // Had we not used `rt.enter` below, this would panic. 394 /// tokio::spawn(async move { 395 /// println!("{}", msg); 396 /// }) 397 /// } 398 /// 399 /// fn main() { 400 /// let rt = Runtime::new().unwrap(); 401 /// 402 /// let s = "Hello World!".to_string(); 403 /// 404 /// // By entering the context, we tie `tokio::spawn` to this executor. 405 /// let _guard = rt.enter(); 406 /// let handle = function_that_spawns(s); 407 /// 408 /// // Wait for the task before we end the test. 409 /// rt.block_on(handle).unwrap(); 410 /// } 411 /// ``` enter(&self) -> EnterGuard<'_>412 pub fn enter(&self) -> EnterGuard<'_> { 413 self.handle.enter() 414 } 415 416 /// Shuts down the runtime, waiting for at most `duration` for all spawned 417 /// work to stop. 418 /// 419 /// See the [struct level documentation](Runtime#shutdown) for more details. 420 /// 421 /// # Examples 422 /// 423 /// ``` 424 /// use tokio::runtime::Runtime; 425 /// use tokio::task; 426 /// 427 /// use std::thread; 428 /// use std::time::Duration; 429 /// 430 /// fn main() { 431 /// let runtime = Runtime::new().unwrap(); 432 /// 433 /// runtime.block_on(async move { 434 /// task::spawn_blocking(move || { 435 /// thread::sleep(Duration::from_secs(10_000)); 436 /// }); 437 /// }); 438 /// 439 /// runtime.shutdown_timeout(Duration::from_millis(100)); 440 /// } 441 /// ``` shutdown_timeout(mut self, duration: Duration)442 pub fn shutdown_timeout(mut self, duration: Duration) { 443 // Wakeup and shutdown all the worker threads 444 self.handle.inner.shutdown(); 445 self.blocking_pool.shutdown(Some(duration)); 446 } 447 448 /// Shuts down the runtime, without waiting for any spawned work to stop. 449 /// 450 /// This can be useful if you want to drop a runtime from within another runtime. 451 /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks 452 /// to complete, which would normally not be permitted within an asynchronous context. 453 /// By calling `shutdown_background()`, you can drop the runtime from such a context. 454 /// 455 /// Note however, that because we do not wait for any blocking tasks to complete, this 456 /// may result in a resource leak (in that any blocking tasks are still running until they 457 /// return. 458 /// 459 /// See the [struct level documentation](Runtime#shutdown) for more details. 460 /// 461 /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`. 462 /// 463 /// ``` 464 /// use tokio::runtime::Runtime; 465 /// 466 /// fn main() { 467 /// let runtime = Runtime::new().unwrap(); 468 /// 469 /// runtime.block_on(async move { 470 /// let inner_runtime = Runtime::new().unwrap(); 471 /// // ... 472 /// inner_runtime.shutdown_background(); 473 /// }); 474 /// } 475 /// ``` shutdown_background(self)476 pub fn shutdown_background(self) { 477 self.shutdown_timeout(Duration::from_nanos(0)); 478 } 479 480 /// Returns a view that lets you get information about how the runtime 481 /// is performing. metrics(&self) -> crate::runtime::RuntimeMetrics482 pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { 483 self.handle.metrics() 484 } 485 } 486 487 #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let 488 impl Drop for Runtime { drop(&mut self)489 fn drop(&mut self) { 490 match &mut self.scheduler { 491 Scheduler::CurrentThread(current_thread) => { 492 // This ensures that tasks spawned on the current-thread 493 // runtime are dropped inside the runtime's context. 494 let _guard = context::try_set_current(&self.handle.inner); 495 current_thread.shutdown(&self.handle.inner); 496 } 497 #[cfg(feature = "rt-multi-thread")] 498 Scheduler::MultiThread(multi_thread) => { 499 // The threaded scheduler drops its tasks on its worker threads, which is 500 // already in the runtime's context. 501 multi_thread.shutdown(&self.handle.inner); 502 } 503 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 504 Scheduler::MultiThreadAlt(multi_thread) => { 505 // The threaded scheduler drops its tasks on its worker threads, which is 506 // already in the runtime's context. 507 multi_thread.shutdown(&self.handle.inner); 508 } 509 } 510 } 511 } 512 513 impl std::panic::UnwindSafe for Runtime {} 514 515 impl std::panic::RefUnwindSafe for Runtime {} 516