1 #![cfg_attr(loom, allow(unused_imports))] 2 3 use crate::runtime::handle::Handle; 4 use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback}; 5 #[cfg(tokio_unstable)] 6 use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta}; 7 use crate::util::rand::{RngSeed, RngSeedGenerator}; 8 9 use crate::runtime::blocking::BlockingPool; 10 use crate::runtime::scheduler::CurrentThread; 11 use std::fmt; 12 use std::io; 13 use std::thread::ThreadId; 14 use std::time::Duration; 15 16 /// Builds Tokio Runtime with custom configuration values. 17 /// 18 /// Methods can be chained in order to set the configuration values. The 19 /// Runtime is constructed by calling [`build`]. 20 /// 21 /// New instances of `Builder` are obtained via [`Builder::new_multi_thread`] 22 /// or [`Builder::new_current_thread`]. 23 /// 24 /// See function level documentation for details on the various configuration 25 /// settings. 26 /// 27 /// [`build`]: method@Self::build 28 /// [`Builder::new_multi_thread`]: method@Self::new_multi_thread 29 /// [`Builder::new_current_thread`]: method@Self::new_current_thread 30 /// 31 /// # Examples 32 /// 33 /// ``` 34 /// use tokio::runtime::Builder; 35 /// 36 /// fn main() { 37 /// // build runtime 38 /// let runtime = Builder::new_multi_thread() 39 /// .worker_threads(4) 40 /// .thread_name("my-custom-name") 41 /// .thread_stack_size(3 * 1024 * 1024) 42 /// .build() 43 /// .unwrap(); 44 /// 45 /// // use runtime ... 46 /// } 47 /// ``` 48 pub struct Builder { 49 /// Runtime type 50 kind: Kind, 51 52 /// Whether or not to enable the I/O driver 53 enable_io: bool, 54 nevents: usize, 55 56 /// Whether or not to enable the time driver 57 enable_time: bool, 58 59 /// Whether or not the clock should start paused. 60 start_paused: bool, 61 62 /// The number of worker threads, used by Runtime. 63 /// 64 /// Only used when not using the current-thread executor. 65 worker_threads: Option<usize>, 66 67 /// Cap on thread usage. 68 max_blocking_threads: usize, 69 70 /// Name fn used for threads spawned by the runtime. 71 pub(super) thread_name: ThreadNameFn, 72 73 /// Stack size used for threads spawned by the runtime. 74 pub(super) thread_stack_size: Option<usize>, 75 76 /// Callback to run after each thread starts. 77 pub(super) after_start: Option<Callback>, 78 79 /// To run before each worker thread stops 80 pub(super) before_stop: Option<Callback>, 81 82 /// To run before each worker thread is parked. 83 pub(super) before_park: Option<Callback>, 84 85 /// To run after each thread is unparked. 86 pub(super) after_unpark: Option<Callback>, 87 88 /// To run before each task is spawned. 89 pub(super) before_spawn: Option<TaskCallback>, 90 91 /// To run after each task is terminated. 92 pub(super) after_termination: Option<TaskCallback>, 93 94 /// Customizable keep alive timeout for `BlockingPool` 95 pub(super) keep_alive: Option<Duration>, 96 97 /// How many ticks before pulling a task from the global/remote queue? 98 /// 99 /// When `None`, the value is unspecified and behavior details are left to 100 /// the scheduler. Each scheduler flavor could choose to either pick its own 101 /// default value or use some other strategy to decide when to poll from the 102 /// global queue. For example, the multi-threaded scheduler uses a 103 /// self-tuning strategy based on mean task poll times. 104 pub(super) global_queue_interval: Option<u32>, 105 106 /// How many ticks before yielding to the driver for timer and I/O events? 107 pub(super) event_interval: u32, 108 109 pub(super) local_queue_capacity: usize, 110 111 /// When true, the multi-threade scheduler LIFO slot should not be used. 112 /// 113 /// This option should only be exposed as unstable. 114 pub(super) disable_lifo_slot: bool, 115 116 /// Specify a random number generator seed to provide deterministic results 117 pub(super) seed_generator: RngSeedGenerator, 118 119 /// When true, enables task poll count histogram instrumentation. 120 pub(super) metrics_poll_count_histogram_enable: bool, 121 122 /// Configures the task poll count histogram 123 pub(super) metrics_poll_count_histogram: HistogramBuilder, 124 125 #[cfg(tokio_unstable)] 126 pub(super) unhandled_panic: UnhandledPanic, 127 } 128 129 cfg_unstable! { 130 /// How the runtime should respond to unhandled panics. 131 /// 132 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic` 133 /// to configure the runtime behavior when a spawned task panics. 134 /// 135 /// See [`Builder::unhandled_panic`] for more details. 136 #[derive(Debug, Clone)] 137 #[non_exhaustive] 138 pub enum UnhandledPanic { 139 /// The runtime should ignore panics on spawned tasks. 140 /// 141 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned 142 /// tasks continue running normally. 143 /// 144 /// This is the default behavior. 145 /// 146 /// # Examples 147 /// 148 /// ``` 149 /// use tokio::runtime::{self, UnhandledPanic}; 150 /// 151 /// # pub fn main() { 152 /// let rt = runtime::Builder::new_current_thread() 153 /// .unhandled_panic(UnhandledPanic::Ignore) 154 /// .build() 155 /// .unwrap(); 156 /// 157 /// let task1 = rt.spawn(async { panic!("boom"); }); 158 /// let task2 = rt.spawn(async { 159 /// // This task completes normally 160 /// "done" 161 /// }); 162 /// 163 /// rt.block_on(async { 164 /// // The panic on the first task is forwarded to the `JoinHandle` 165 /// assert!(task1.await.is_err()); 166 /// 167 /// // The second task completes normally 168 /// assert!(task2.await.is_ok()); 169 /// }) 170 /// # } 171 /// ``` 172 /// 173 /// [`JoinHandle`]: struct@crate::task::JoinHandle 174 Ignore, 175 176 /// The runtime should immediately shutdown if a spawned task panics. 177 /// 178 /// The runtime will immediately shutdown even if the panicked task's 179 /// [`JoinHandle`] is still available. All further spawned tasks will be 180 /// immediately dropped and call to [`Runtime::block_on`] will panic. 181 /// 182 /// # Examples 183 /// 184 /// ```should_panic 185 /// use tokio::runtime::{self, UnhandledPanic}; 186 /// 187 /// # pub fn main() { 188 /// let rt = runtime::Builder::new_current_thread() 189 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) 190 /// .build() 191 /// .unwrap(); 192 /// 193 /// rt.spawn(async { panic!("boom"); }); 194 /// rt.spawn(async { 195 /// // This task never completes. 196 /// }); 197 /// 198 /// rt.block_on(async { 199 /// // Do some work 200 /// # loop { tokio::task::yield_now().await; } 201 /// }) 202 /// # } 203 /// ``` 204 /// 205 /// [`JoinHandle`]: struct@crate::task::JoinHandle 206 ShutdownRuntime, 207 } 208 } 209 210 pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>; 211 212 #[derive(Clone, Copy)] 213 pub(crate) enum Kind { 214 CurrentThread, 215 #[cfg(feature = "rt-multi-thread")] 216 MultiThread, 217 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 218 MultiThreadAlt, 219 } 220 221 impl Builder { 222 /// Returns a new builder with the current thread scheduler selected. 223 /// 224 /// Configuration methods can be chained on the return value. 225 /// 226 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a 227 /// [`LocalSet`]. 228 /// 229 /// [`LocalSet`]: crate::task::LocalSet new_current_thread() -> Builder230 pub fn new_current_thread() -> Builder { 231 #[cfg(loom)] 232 const EVENT_INTERVAL: u32 = 4; 233 // The number `61` is fairly arbitrary. I believe this value was copied from golang. 234 #[cfg(not(loom))] 235 const EVENT_INTERVAL: u32 = 61; 236 237 Builder::new(Kind::CurrentThread, EVENT_INTERVAL) 238 } 239 240 /// Returns a new builder with the multi thread scheduler selected. 241 /// 242 /// Configuration methods can be chained on the return value. 243 #[cfg(feature = "rt-multi-thread")] 244 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] new_multi_thread() -> Builder245 pub fn new_multi_thread() -> Builder { 246 // The number `61` is fairly arbitrary. I believe this value was copied from golang. 247 Builder::new(Kind::MultiThread, 61) 248 } 249 250 cfg_unstable! { 251 /// Returns a new builder with the alternate multi thread scheduler 252 /// selected. 253 /// 254 /// The alternate multi threaded scheduler is an in-progress 255 /// candidate to replace the existing multi threaded scheduler. It 256 /// currently does not scale as well to 16+ processors. 257 /// 258 /// This runtime flavor is currently **not considered production 259 /// ready**. 260 /// 261 /// Configuration methods can be chained on the return value. 262 #[cfg(feature = "rt-multi-thread")] 263 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] 264 pub fn new_multi_thread_alt() -> Builder { 265 // The number `61` is fairly arbitrary. I believe this value was copied from golang. 266 Builder::new(Kind::MultiThreadAlt, 61) 267 } 268 } 269 270 /// Returns a new runtime builder initialized with default configuration 271 /// values. 272 /// 273 /// Configuration methods can be chained on the return value. new(kind: Kind, event_interval: u32) -> Builder274 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder { 275 Builder { 276 kind, 277 278 // I/O defaults to "off" 279 enable_io: false, 280 nevents: 1024, 281 282 // Time defaults to "off" 283 enable_time: false, 284 285 // The clock starts not-paused 286 start_paused: false, 287 288 // Read from environment variable first in multi-threaded mode. 289 // Default to lazy auto-detection (one thread per CPU core) 290 worker_threads: None, 291 292 max_blocking_threads: 512, 293 294 // Default thread name 295 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()), 296 297 // Do not set a stack size by default 298 thread_stack_size: None, 299 300 // No worker thread callbacks 301 after_start: None, 302 before_stop: None, 303 before_park: None, 304 after_unpark: None, 305 306 before_spawn: None, 307 after_termination: None, 308 309 keep_alive: None, 310 311 // Defaults for these values depend on the scheduler kind, so we get them 312 // as parameters. 313 global_queue_interval: None, 314 event_interval, 315 316 #[cfg(not(loom))] 317 local_queue_capacity: 256, 318 319 #[cfg(loom)] 320 local_queue_capacity: 4, 321 322 seed_generator: RngSeedGenerator::new(RngSeed::new()), 323 324 #[cfg(tokio_unstable)] 325 unhandled_panic: UnhandledPanic::Ignore, 326 327 metrics_poll_count_histogram_enable: false, 328 329 metrics_poll_count_histogram: HistogramBuilder::default(), 330 331 disable_lifo_slot: false, 332 } 333 } 334 335 /// Enables both I/O and time drivers. 336 /// 337 /// Doing this is a shorthand for calling `enable_io` and `enable_time` 338 /// individually. If additional components are added to Tokio in the future, 339 /// `enable_all` will include these future components. 340 /// 341 /// # Examples 342 /// 343 /// ``` 344 /// use tokio::runtime; 345 /// 346 /// let rt = runtime::Builder::new_multi_thread() 347 /// .enable_all() 348 /// .build() 349 /// .unwrap(); 350 /// ``` enable_all(&mut self) -> &mut Self351 pub fn enable_all(&mut self) -> &mut Self { 352 #[cfg(any( 353 feature = "net", 354 all(unix, feature = "process"), 355 all(unix, feature = "signal") 356 ))] 357 self.enable_io(); 358 #[cfg(feature = "time")] 359 self.enable_time(); 360 361 self 362 } 363 364 /// Sets the number of worker threads the `Runtime` will use. 365 /// 366 /// This can be any number above 0 though it is advised to keep this value 367 /// on the smaller side. 368 /// 369 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`. 370 /// 371 /// # Default 372 /// 373 /// The default value is the number of cores available to the system. 374 /// 375 /// When using the `current_thread` runtime this method has no effect. 376 /// 377 /// # Examples 378 /// 379 /// ## Multi threaded runtime with 4 threads 380 /// 381 /// ``` 382 /// use tokio::runtime; 383 /// 384 /// // This will spawn a work-stealing runtime with 4 worker threads. 385 /// let rt = runtime::Builder::new_multi_thread() 386 /// .worker_threads(4) 387 /// .build() 388 /// .unwrap(); 389 /// 390 /// rt.spawn(async move {}); 391 /// ``` 392 /// 393 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`) 394 /// 395 /// ``` 396 /// use tokio::runtime; 397 /// 398 /// // Create a runtime that _must_ be driven from a call 399 /// // to `Runtime::block_on`. 400 /// let rt = runtime::Builder::new_current_thread() 401 /// .build() 402 /// .unwrap(); 403 /// 404 /// // This will run the runtime and future on the current thread 405 /// rt.block_on(async move {}); 406 /// ``` 407 /// 408 /// # Panics 409 /// 410 /// This will panic if `val` is not larger than `0`. 411 #[track_caller] worker_threads(&mut self, val: usize) -> &mut Self412 pub fn worker_threads(&mut self, val: usize) -> &mut Self { 413 assert!(val > 0, "Worker threads cannot be set to 0"); 414 self.worker_threads = Some(val); 415 self 416 } 417 418 /// Specifies the limit for additional threads spawned by the Runtime. 419 /// 420 /// These threads are used for blocking operations like tasks spawned 421 /// through [`spawn_blocking`], this includes but is not limited to: 422 /// - [`fs`] operations 423 /// - dns resolution through [`ToSocketAddrs`] 424 /// - writing to [`Stdout`] or [`Stderr`] 425 /// - reading from [`Stdin`] 426 /// 427 /// Unlike the [`worker_threads`], they are not always active and will exit 428 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`]. 429 /// 430 /// It's recommended to not set this limit too low in order to avoid hanging on operations 431 /// requiring [`spawn_blocking`]. 432 /// 433 /// The default value is 512. 434 /// 435 /// # Panics 436 /// 437 /// This will panic if `val` is not larger than `0`. 438 /// 439 /// # Upgrading from 0.x 440 /// 441 /// In old versions `max_threads` limited both blocking and worker threads, but the 442 /// current `max_blocking_threads` does not include async worker threads in the count. 443 /// 444 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking 445 /// [`fs`]: mod@crate::fs 446 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs 447 /// [`Stdout`]: struct@crate::io::Stdout 448 /// [`Stdin`]: struct@crate::io::Stdin 449 /// [`Stderr`]: struct@crate::io::Stderr 450 /// [`worker_threads`]: Self::worker_threads 451 /// [`thread_keep_alive`]: Self::thread_keep_alive 452 #[track_caller] 453 #[cfg_attr(docsrs, doc(alias = "max_threads"))] max_blocking_threads(&mut self, val: usize) -> &mut Self454 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self { 455 assert!(val > 0, "Max blocking threads cannot be set to 0"); 456 self.max_blocking_threads = val; 457 self 458 } 459 460 /// Sets name of threads spawned by the `Runtime`'s thread pool. 461 /// 462 /// The default name is "tokio-runtime-worker". 463 /// 464 /// # Examples 465 /// 466 /// ``` 467 /// # use tokio::runtime; 468 /// 469 /// # pub fn main() { 470 /// let rt = runtime::Builder::new_multi_thread() 471 /// .thread_name("my-pool") 472 /// .build(); 473 /// # } 474 /// ``` thread_name(&mut self, val: impl Into<String>) -> &mut Self475 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self { 476 let val = val.into(); 477 self.thread_name = std::sync::Arc::new(move || val.clone()); 478 self 479 } 480 481 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool. 482 /// 483 /// The default name fn is `|| "tokio-runtime-worker".into()`. 484 /// 485 /// # Examples 486 /// 487 /// ``` 488 /// # use tokio::runtime; 489 /// # use std::sync::atomic::{AtomicUsize, Ordering}; 490 /// # pub fn main() { 491 /// let rt = runtime::Builder::new_multi_thread() 492 /// .thread_name_fn(|| { 493 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); 494 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); 495 /// format!("my-pool-{}", id) 496 /// }) 497 /// .build(); 498 /// # } 499 /// ``` thread_name_fn<F>(&mut self, f: F) -> &mut Self where F: Fn() -> String + Send + Sync + 'static,500 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self 501 where 502 F: Fn() -> String + Send + Sync + 'static, 503 { 504 self.thread_name = std::sync::Arc::new(f); 505 self 506 } 507 508 /// Sets the stack size (in bytes) for worker threads. 509 /// 510 /// The actual stack size may be greater than this value if the platform 511 /// specifies minimal stack size. 512 /// 513 /// The default stack size for spawned threads is 2 MiB, though this 514 /// particular stack size is subject to change in the future. 515 /// 516 /// # Examples 517 /// 518 /// ``` 519 /// # use tokio::runtime; 520 /// 521 /// # pub fn main() { 522 /// let rt = runtime::Builder::new_multi_thread() 523 /// .thread_stack_size(32 * 1024) 524 /// .build(); 525 /// # } 526 /// ``` thread_stack_size(&mut self, val: usize) -> &mut Self527 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self { 528 self.thread_stack_size = Some(val); 529 self 530 } 531 532 /// Executes function `f` after each thread is started but before it starts 533 /// doing work. 534 /// 535 /// This is intended for bookkeeping and monitoring use cases. 536 /// 537 /// # Examples 538 /// 539 /// ``` 540 /// # use tokio::runtime; 541 /// # pub fn main() { 542 /// let runtime = runtime::Builder::new_multi_thread() 543 /// .on_thread_start(|| { 544 /// println!("thread started"); 545 /// }) 546 /// .build(); 547 /// # } 548 /// ``` 549 #[cfg(not(loom))] on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,550 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self 551 where 552 F: Fn() + Send + Sync + 'static, 553 { 554 self.after_start = Some(std::sync::Arc::new(f)); 555 self 556 } 557 558 /// Executes function `f` before each thread stops. 559 /// 560 /// This is intended for bookkeeping and monitoring use cases. 561 /// 562 /// # Examples 563 /// 564 /// ``` 565 /// # use tokio::runtime; 566 /// # pub fn main() { 567 /// let runtime = runtime::Builder::new_multi_thread() 568 /// .on_thread_stop(|| { 569 /// println!("thread stopping"); 570 /// }) 571 /// .build(); 572 /// # } 573 /// ``` 574 #[cfg(not(loom))] on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,575 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self 576 where 577 F: Fn() + Send + Sync + 'static, 578 { 579 self.before_stop = Some(std::sync::Arc::new(f)); 580 self 581 } 582 583 /// Executes function `f` just before a thread is parked (goes idle). 584 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn) 585 /// can be called, and may result in this thread being unparked immediately. 586 /// 587 /// This can be used to start work only when the executor is idle, or for bookkeeping 588 /// and monitoring purposes. 589 /// 590 /// Note: There can only be one park callback for a runtime; calling this function 591 /// more than once replaces the last callback defined, rather than adding to it. 592 /// 593 /// # Examples 594 /// 595 /// ## Multithreaded executor 596 /// ``` 597 /// # use std::sync::Arc; 598 /// # use std::sync::atomic::{AtomicBool, Ordering}; 599 /// # use tokio::runtime; 600 /// # use tokio::sync::Barrier; 601 /// # pub fn main() { 602 /// let once = AtomicBool::new(true); 603 /// let barrier = Arc::new(Barrier::new(2)); 604 /// 605 /// let runtime = runtime::Builder::new_multi_thread() 606 /// .worker_threads(1) 607 /// .on_thread_park({ 608 /// let barrier = barrier.clone(); 609 /// move || { 610 /// let barrier = barrier.clone(); 611 /// if once.swap(false, Ordering::Relaxed) { 612 /// tokio::spawn(async move { barrier.wait().await; }); 613 /// } 614 /// } 615 /// }) 616 /// .build() 617 /// .unwrap(); 618 /// 619 /// runtime.block_on(async { 620 /// barrier.wait().await; 621 /// }) 622 /// # } 623 /// ``` 624 /// ## Current thread executor 625 /// ``` 626 /// # use std::sync::Arc; 627 /// # use std::sync::atomic::{AtomicBool, Ordering}; 628 /// # use tokio::runtime; 629 /// # use tokio::sync::Barrier; 630 /// # pub fn main() { 631 /// let once = AtomicBool::new(true); 632 /// let barrier = Arc::new(Barrier::new(2)); 633 /// 634 /// let runtime = runtime::Builder::new_current_thread() 635 /// .on_thread_park({ 636 /// let barrier = barrier.clone(); 637 /// move || { 638 /// let barrier = barrier.clone(); 639 /// if once.swap(false, Ordering::Relaxed) { 640 /// tokio::spawn(async move { barrier.wait().await; }); 641 /// } 642 /// } 643 /// }) 644 /// .build() 645 /// .unwrap(); 646 /// 647 /// runtime.block_on(async { 648 /// barrier.wait().await; 649 /// }) 650 /// # } 651 /// ``` 652 #[cfg(not(loom))] on_thread_park<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,653 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self 654 where 655 F: Fn() + Send + Sync + 'static, 656 { 657 self.before_park = Some(std::sync::Arc::new(f)); 658 self 659 } 660 661 /// Executes function `f` just after a thread unparks (starts executing tasks). 662 /// 663 /// This is intended for bookkeeping and monitoring use cases; note that work 664 /// in this callback will increase latencies when the application has allowed one or 665 /// more runtime threads to go idle. 666 /// 667 /// Note: There can only be one unpark callback for a runtime; calling this function 668 /// more than once replaces the last callback defined, rather than adding to it. 669 /// 670 /// # Examples 671 /// 672 /// ``` 673 /// # use tokio::runtime; 674 /// # pub fn main() { 675 /// let runtime = runtime::Builder::new_multi_thread() 676 /// .on_thread_unpark(|| { 677 /// println!("thread unparking"); 678 /// }) 679 /// .build(); 680 /// 681 /// runtime.unwrap().block_on(async { 682 /// tokio::task::yield_now().await; 683 /// println!("Hello from Tokio!"); 684 /// }) 685 /// # } 686 /// ``` 687 #[cfg(not(loom))] on_thread_unpark<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,688 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self 689 where 690 F: Fn() + Send + Sync + 'static, 691 { 692 self.after_unpark = Some(std::sync::Arc::new(f)); 693 self 694 } 695 696 /// Executes function `f` just before a task is spawned. 697 /// 698 /// `f` is called within the Tokio context, so functions like 699 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being 700 /// invoked immediately. 701 /// 702 /// This can be used for bookkeeping or monitoring purposes. 703 /// 704 /// Note: There can only be one spawn callback for a runtime; calling this function more 705 /// than once replaces the last callback defined, rather than adding to it. 706 /// 707 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time. 708 /// 709 /// **Note**: This is an [unstable API][unstable]. The public API of this type 710 /// may break in 1.x releases. See [the documentation on unstable 711 /// features][unstable] for details. 712 /// 713 /// [unstable]: crate#unstable-features 714 /// 715 /// # Examples 716 /// 717 /// ``` 718 /// # use tokio::runtime; 719 /// # pub fn main() { 720 /// let runtime = runtime::Builder::new_current_thread() 721 /// .on_task_spawn(|_| { 722 /// println!("spawning task"); 723 /// }) 724 /// .build() 725 /// .unwrap(); 726 /// 727 /// runtime.block_on(async { 728 /// tokio::task::spawn(std::future::ready(())); 729 /// 730 /// for _ in 0..64 { 731 /// tokio::task::yield_now().await; 732 /// } 733 /// }) 734 /// # } 735 /// ``` 736 #[cfg(all(not(loom), tokio_unstable))] 737 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] on_task_spawn<F>(&mut self, f: F) -> &mut Self where F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,738 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self 739 where 740 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, 741 { 742 self.before_spawn = Some(std::sync::Arc::new(f)); 743 self 744 } 745 746 /// Executes function `f` just after a task is terminated. 747 /// 748 /// `f` is called within the Tokio context, so functions like 749 /// [`tokio::spawn`](crate::spawn) can be called. 750 /// 751 /// This can be used for bookkeeping or monitoring purposes. 752 /// 753 /// Note: There can only be one task termination callback for a runtime; calling this 754 /// function more than once replaces the last callback defined, rather than adding to it. 755 /// 756 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time. 757 /// 758 /// **Note**: This is an [unstable API][unstable]. The public API of this type 759 /// may break in 1.x releases. See [the documentation on unstable 760 /// features][unstable] for details. 761 /// 762 /// [unstable]: crate#unstable-features 763 /// 764 /// # Examples 765 /// 766 /// ``` 767 /// # use tokio::runtime; 768 /// # pub fn main() { 769 /// let runtime = runtime::Builder::new_current_thread() 770 /// .on_task_terminate(|_| { 771 /// println!("killing task"); 772 /// }) 773 /// .build() 774 /// .unwrap(); 775 /// 776 /// runtime.block_on(async { 777 /// tokio::task::spawn(std::future::ready(())); 778 /// 779 /// for _ in 0..64 { 780 /// tokio::task::yield_now().await; 781 /// } 782 /// }) 783 /// # } 784 /// ``` 785 #[cfg(all(not(loom), tokio_unstable))] 786 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] on_task_terminate<F>(&mut self, f: F) -> &mut Self where F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,787 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self 788 where 789 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, 790 { 791 self.after_termination = Some(std::sync::Arc::new(f)); 792 self 793 } 794 795 /// Creates the configured `Runtime`. 796 /// 797 /// The returned `Runtime` instance is ready to spawn tasks. 798 /// 799 /// # Examples 800 /// 801 /// ``` 802 /// use tokio::runtime::Builder; 803 /// 804 /// let rt = Builder::new_multi_thread().build().unwrap(); 805 /// 806 /// rt.block_on(async { 807 /// println!("Hello from the Tokio runtime"); 808 /// }); 809 /// ``` build(&mut self) -> io::Result<Runtime>810 pub fn build(&mut self) -> io::Result<Runtime> { 811 match &self.kind { 812 Kind::CurrentThread => self.build_current_thread_runtime(), 813 #[cfg(feature = "rt-multi-thread")] 814 Kind::MultiThread => self.build_threaded_runtime(), 815 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 816 Kind::MultiThreadAlt => self.build_alt_threaded_runtime(), 817 } 818 } 819 820 /// Creates the configured `LocalRuntime`. 821 /// 822 /// The returned `LocalRuntime` instance is ready to spawn tasks. 823 /// 824 /// # Panics 825 /// This will panic if `current_thread` is not the selected runtime flavor. 826 /// All other runtime flavors are unsupported by [`LocalRuntime`]. 827 /// 828 /// [`LocalRuntime`]: [crate::runtime::LocalRuntime] 829 /// 830 /// # Examples 831 /// 832 /// ``` 833 /// use tokio::runtime::Builder; 834 /// 835 /// let rt = Builder::new_current_thread().build_local(&mut Default::default()).unwrap(); 836 /// 837 /// rt.block_on(async { 838 /// println!("Hello from the Tokio runtime"); 839 /// }); 840 /// ``` 841 #[allow(unused_variables, unreachable_patterns)] 842 #[cfg(tokio_unstable)] 843 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime>844 pub fn build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime> { 845 match &self.kind { 846 Kind::CurrentThread => self.build_current_thread_local_runtime(), 847 _ => panic!("Only current_thread is supported when building a local runtime"), 848 } 849 } 850 get_cfg(&self, workers: usize) -> driver::Cfg851 fn get_cfg(&self, workers: usize) -> driver::Cfg { 852 driver::Cfg { 853 enable_pause_time: match self.kind { 854 Kind::CurrentThread => true, 855 #[cfg(feature = "rt-multi-thread")] 856 Kind::MultiThread => false, 857 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 858 Kind::MultiThreadAlt => false, 859 }, 860 enable_io: self.enable_io, 861 enable_time: self.enable_time, 862 start_paused: self.start_paused, 863 nevents: self.nevents, 864 workers, 865 } 866 } 867 868 /// Sets a custom timeout for a thread in the blocking pool. 869 /// 870 /// By default, the timeout for a thread is set to 10 seconds. This can 871 /// be overridden using `.thread_keep_alive()`. 872 /// 873 /// # Example 874 /// 875 /// ``` 876 /// # use tokio::runtime; 877 /// # use std::time::Duration; 878 /// # pub fn main() { 879 /// let rt = runtime::Builder::new_multi_thread() 880 /// .thread_keep_alive(Duration::from_millis(100)) 881 /// .build(); 882 /// # } 883 /// ``` thread_keep_alive(&mut self, duration: Duration) -> &mut Self884 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self { 885 self.keep_alive = Some(duration); 886 self 887 } 888 889 /// Sets the number of scheduler ticks after which the scheduler will poll the global 890 /// task queue. 891 /// 892 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. 893 /// 894 /// By default the global queue interval is 31 for the current-thread scheduler. Please see 895 /// [the module documentation] for the default behavior of the multi-thread scheduler. 896 /// 897 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming 898 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler, 899 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing 900 /// getting started on new work, especially if tasks frequently yield rather than complete 901 /// or await on further I/O. Conversely, a higher value prioritizes existing work, and 902 /// is a good choice when most tasks quickly complete polling. 903 /// 904 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing 905 /// 906 /// # Panics 907 /// 908 /// This function will panic if 0 is passed as an argument. 909 /// 910 /// # Examples 911 /// 912 /// ``` 913 /// # use tokio::runtime; 914 /// # pub fn main() { 915 /// let rt = runtime::Builder::new_multi_thread() 916 /// .global_queue_interval(31) 917 /// .build(); 918 /// # } 919 /// ``` 920 #[track_caller] global_queue_interval(&mut self, val: u32) -> &mut Self921 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { 922 assert!(val > 0, "global_queue_interval must be greater than 0"); 923 self.global_queue_interval = Some(val); 924 self 925 } 926 927 /// Sets the number of scheduler ticks after which the scheduler will poll for 928 /// external events (timers, I/O, and so on). 929 /// 930 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. 931 /// 932 /// By default, the event interval is `61` for all scheduler types. 933 /// 934 /// Setting the event interval determines the effective "priority" of delivering 935 /// these external events (which may wake up additional tasks), compared to 936 /// executing tasks that are currently ready to run. A smaller value is useful 937 /// when tasks frequently spend a long time in polling, or frequently yield, 938 /// which can result in overly long delays picking up I/O events. Conversely, 939 /// picking up new events requires extra synchronization and syscall overhead, 940 /// so if tasks generally complete their polling quickly, a higher event interval 941 /// will minimize that overhead while still keeping the scheduler responsive to 942 /// events. 943 /// 944 /// # Examples 945 /// 946 /// ``` 947 /// # use tokio::runtime; 948 /// # pub fn main() { 949 /// let rt = runtime::Builder::new_multi_thread() 950 /// .event_interval(31) 951 /// .build(); 952 /// # } 953 /// ``` event_interval(&mut self, val: u32) -> &mut Self954 pub fn event_interval(&mut self, val: u32) -> &mut Self { 955 self.event_interval = val; 956 self 957 } 958 959 cfg_unstable! { 960 /// Configure how the runtime responds to an unhandled panic on a 961 /// spawned task. 962 /// 963 /// By default, an unhandled panic (i.e. a panic not caught by 964 /// [`std::panic::catch_unwind`]) has no impact on the runtime's 965 /// execution. The panic's error value is forwarded to the task's 966 /// [`JoinHandle`] and all other spawned tasks continue running. 967 /// 968 /// The `unhandled_panic` option enables configuring this behavior. 969 /// 970 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on 971 /// spawned tasks have no impact on the runtime's execution. 972 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to 973 /// shutdown immediately when a spawned task panics even if that 974 /// task's `JoinHandle` has not been dropped. All other spawned tasks 975 /// will immediately terminate and further calls to 976 /// [`Runtime::block_on`] will panic. 977 /// 978 /// # Panics 979 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`] 980 /// on a runtime other than the current thread runtime. 981 /// 982 /// # Unstable 983 /// 984 /// This option is currently unstable and its implementation is 985 /// incomplete. The API may change or be removed in the future. See 986 /// issue [tokio-rs/tokio#4516] for more details. 987 /// 988 /// # Examples 989 /// 990 /// The following demonstrates a runtime configured to shutdown on 991 /// panic. The first spawned task panics and results in the runtime 992 /// shutting down. The second spawned task never has a chance to 993 /// execute. The call to `block_on` will panic due to the runtime being 994 /// forcibly shutdown. 995 /// 996 /// ```should_panic 997 /// use tokio::runtime::{self, UnhandledPanic}; 998 /// 999 /// # pub fn main() { 1000 /// let rt = runtime::Builder::new_current_thread() 1001 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) 1002 /// .build() 1003 /// .unwrap(); 1004 /// 1005 /// rt.spawn(async { panic!("boom"); }); 1006 /// rt.spawn(async { 1007 /// // This task never completes. 1008 /// }); 1009 /// 1010 /// rt.block_on(async { 1011 /// // Do some work 1012 /// # loop { tokio::task::yield_now().await; } 1013 /// }) 1014 /// # } 1015 /// ``` 1016 /// 1017 /// [`JoinHandle`]: struct@crate::task::JoinHandle 1018 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516 1019 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self { 1020 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) { 1021 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime"); 1022 } 1023 1024 self.unhandled_panic = behavior; 1025 self 1026 } 1027 1028 /// Disables the LIFO task scheduler heuristic. 1029 /// 1030 /// The multi-threaded scheduler includes a heuristic for optimizing 1031 /// message-passing patterns. This heuristic results in the **last** 1032 /// scheduled task being polled first. 1033 /// 1034 /// To implement this heuristic, each worker thread has a slot which 1035 /// holds the task that should be polled next. However, this slot cannot 1036 /// be stolen by other worker threads, which can result in lower total 1037 /// throughput when tasks tend to have longer poll times. 1038 /// 1039 /// This configuration option will disable this heuristic resulting in 1040 /// all scheduled tasks being pushed into the worker-local queue, which 1041 /// is stealable. 1042 /// 1043 /// Consider trying this option when the task "scheduled" time is high 1044 /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to 1045 /// collect this data. 1046 /// 1047 /// # Unstable 1048 /// 1049 /// This configuration option is considered a workaround for the LIFO 1050 /// slot not being stealable. When the slot becomes stealable, we will 1051 /// revisit whether or not this option is necessary. See 1052 /// issue [tokio-rs/tokio#4941]. 1053 /// 1054 /// # Examples 1055 /// 1056 /// ``` 1057 /// use tokio::runtime; 1058 /// 1059 /// let rt = runtime::Builder::new_multi_thread() 1060 /// .disable_lifo_slot() 1061 /// .build() 1062 /// .unwrap(); 1063 /// ``` 1064 /// 1065 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics 1066 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941 1067 pub fn disable_lifo_slot(&mut self) -> &mut Self { 1068 self.disable_lifo_slot = true; 1069 self 1070 } 1071 1072 /// Specifies the random number generation seed to use within all 1073 /// threads associated with the runtime being built. 1074 /// 1075 /// This option is intended to make certain parts of the runtime 1076 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of 1077 /// [`tokio::select!`] it will ensure that the order that branches are 1078 /// polled is deterministic. 1079 /// 1080 /// In addition to the code specifying `rng_seed` and interacting with 1081 /// the runtime, the internals of Tokio and the Rust compiler may affect 1082 /// the sequences of random numbers. In order to ensure repeatable 1083 /// results, the version of Tokio, the versions of all other 1084 /// dependencies that interact with Tokio, and the Rust compiler version 1085 /// should also all remain constant. 1086 /// 1087 /// # Examples 1088 /// 1089 /// ``` 1090 /// # use tokio::runtime::{self, RngSeed}; 1091 /// # pub fn main() { 1092 /// let seed = RngSeed::from_bytes(b"place your seed here"); 1093 /// let rt = runtime::Builder::new_current_thread() 1094 /// .rng_seed(seed) 1095 /// .build(); 1096 /// # } 1097 /// ``` 1098 /// 1099 /// [`tokio::select!`]: crate::select 1100 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { 1101 self.seed_generator = RngSeedGenerator::new(seed); 1102 self 1103 } 1104 } 1105 1106 cfg_unstable_metrics! { 1107 /// Enables tracking the distribution of task poll times. 1108 /// 1109 /// Task poll times are not instrumented by default as doing so requires 1110 /// calling [`Instant::now()`] twice per task poll, which could add 1111 /// measurable overhead. Use the [`Handle::metrics()`] to access the 1112 /// metrics data. 1113 /// 1114 /// The histogram uses fixed bucket sizes. In other words, the histogram 1115 /// buckets are not dynamic based on input values. Use the 1116 /// `metrics_poll_time_histogram` builder methods to configure the 1117 /// histogram details. 1118 /// 1119 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. 1120 /// This has an extremely low memory footprint, but may not provide enough granularity. For 1121 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`] 1122 /// to select [`LogHistogram`] instead. 1123 /// 1124 /// # Examples 1125 /// 1126 /// ``` 1127 /// use tokio::runtime; 1128 /// 1129 /// let rt = runtime::Builder::new_multi_thread() 1130 /// .enable_metrics_poll_time_histogram() 1131 /// .build() 1132 /// .unwrap(); 1133 /// # // Test default values here 1134 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) } 1135 /// # let m = rt.handle().metrics(); 1136 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10); 1137 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100)); 1138 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200)); 1139 /// ``` 1140 /// 1141 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics 1142 /// [`Instant::now()`]: std::time::Instant::now 1143 /// [`LogHistogram`]: crate::runtime::LogHistogram 1144 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration 1145 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self { 1146 self.metrics_poll_count_histogram_enable = true; 1147 self 1148 } 1149 1150 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead. 1151 /// 1152 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram 1153 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")] 1154 #[doc(hidden)] 1155 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self { 1156 self.enable_metrics_poll_time_histogram() 1157 } 1158 1159 /// Sets the histogram scale for tracking the distribution of task poll 1160 /// times. 1161 /// 1162 /// Tracking the distribution of task poll times can be done using a 1163 /// linear or log scale. When using linear scale, each histogram bucket 1164 /// will represent the same range of poll times. When using log scale, 1165 /// each histogram bucket will cover a range twice as big as the 1166 /// previous bucket. 1167 /// 1168 /// **Default:** linear scale. 1169 /// 1170 /// # Examples 1171 /// 1172 /// ``` 1173 /// use tokio::runtime::{self, HistogramScale}; 1174 /// 1175 /// # #[allow(deprecated)] 1176 /// let rt = runtime::Builder::new_multi_thread() 1177 /// .enable_metrics_poll_time_histogram() 1178 /// .metrics_poll_count_histogram_scale(HistogramScale::Log) 1179 /// .build() 1180 /// .unwrap(); 1181 /// ``` 1182 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")] 1183 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self { 1184 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale); 1185 self 1186 } 1187 1188 /// Configure the histogram for tracking poll times 1189 /// 1190 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. 1191 /// This has an extremely low memory footprint, but may not provide enough granularity. For 1192 /// better granularity with low memory usage, use [`LogHistogram`] instead. 1193 /// 1194 /// # Examples 1195 /// Configure a [`LogHistogram`] with [default configuration]: 1196 /// ``` 1197 /// use tokio::runtime; 1198 /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; 1199 /// 1200 /// let rt = runtime::Builder::new_multi_thread() 1201 /// .enable_metrics_poll_time_histogram() 1202 /// .metrics_poll_time_histogram_configuration( 1203 /// HistogramConfiguration::log(LogHistogram::default()) 1204 /// ) 1205 /// .build() 1206 /// .unwrap(); 1207 /// ``` 1208 /// 1209 /// Configure a linear histogram with 100 buckets, each 10μs wide 1210 /// ``` 1211 /// use tokio::runtime; 1212 /// use std::time::Duration; 1213 /// use tokio::runtime::HistogramConfiguration; 1214 /// 1215 /// let rt = runtime::Builder::new_multi_thread() 1216 /// .enable_metrics_poll_time_histogram() 1217 /// .metrics_poll_time_histogram_configuration( 1218 /// HistogramConfiguration::linear(Duration::from_micros(10), 100) 1219 /// ) 1220 /// .build() 1221 /// .unwrap(); 1222 /// ``` 1223 /// 1224 /// Configure a [`LogHistogram`] with the following settings: 1225 /// - Measure times from 100ns to 120s 1226 /// - Max error of 0.1 1227 /// - No more than 1024 buckets 1228 /// ``` 1229 /// use std::time::Duration; 1230 /// use tokio::runtime; 1231 /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; 1232 /// 1233 /// let rt = runtime::Builder::new_multi_thread() 1234 /// .enable_metrics_poll_time_histogram() 1235 /// .metrics_poll_time_histogram_configuration( 1236 /// HistogramConfiguration::log(LogHistogram::builder() 1237 /// .max_value(Duration::from_secs(120)) 1238 /// .min_value(Duration::from_nanos(100)) 1239 /// .max_error(0.1) 1240 /// .max_buckets(1024) 1241 /// .expect("configuration uses 488 buckets") 1242 /// ) 1243 /// ) 1244 /// .build() 1245 /// .unwrap(); 1246 /// ``` 1247 /// 1248 /// [`LogHistogram`]: crate::runtime::LogHistogram 1249 /// [default configuration]: crate::runtime::LogHistogramBuilder 1250 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self { 1251 self.metrics_poll_count_histogram.histogram_type = configuration.inner; 1252 self 1253 } 1254 1255 /// Sets the histogram resolution for tracking the distribution of task 1256 /// poll times. 1257 /// 1258 /// The resolution is the histogram's first bucket's range. When using a 1259 /// linear histogram scale, each bucket will cover the same range. When 1260 /// using a log scale, each bucket will cover a range twice as big as 1261 /// the previous bucket. In the log case, the resolution represents the 1262 /// smallest bucket range. 1263 /// 1264 /// Note that, when using log scale, the resolution is rounded up to the 1265 /// nearest power of 2 in nanoseconds. 1266 /// 1267 /// **Default:** 100 microseconds. 1268 /// 1269 /// # Examples 1270 /// 1271 /// ``` 1272 /// use tokio::runtime; 1273 /// use std::time::Duration; 1274 /// 1275 /// # #[allow(deprecated)] 1276 /// let rt = runtime::Builder::new_multi_thread() 1277 /// .enable_metrics_poll_time_histogram() 1278 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100)) 1279 /// .build() 1280 /// .unwrap(); 1281 /// ``` 1282 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")] 1283 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self { 1284 assert!(resolution > Duration::from_secs(0)); 1285 // Sanity check the argument and also make the cast below safe. 1286 assert!(resolution <= Duration::from_secs(1)); 1287 1288 let resolution = resolution.as_nanos() as u64; 1289 1290 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution); 1291 self 1292 } 1293 1294 /// Sets the number of buckets for the histogram tracking the 1295 /// distribution of task poll times. 1296 /// 1297 /// The last bucket tracks all greater values that fall out of other 1298 /// ranges. So, configuring the histogram using a linear scale, 1299 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task 1300 /// polls that take more than 450ms to complete. 1301 /// 1302 /// **Default:** 10 1303 /// 1304 /// # Examples 1305 /// 1306 /// ``` 1307 /// use tokio::runtime; 1308 /// 1309 /// # #[allow(deprecated)] 1310 /// let rt = runtime::Builder::new_multi_thread() 1311 /// .enable_metrics_poll_time_histogram() 1312 /// .metrics_poll_count_histogram_buckets(15) 1313 /// .build() 1314 /// .unwrap(); 1315 /// ``` 1316 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")] 1317 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self { 1318 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets); 1319 self 1320 } 1321 } 1322 1323 cfg_loom! { 1324 pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self { 1325 assert!(value.is_power_of_two()); 1326 self.local_queue_capacity = value; 1327 self 1328 } 1329 } 1330 build_current_thread_runtime(&mut self) -> io::Result<Runtime>1331 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> { 1332 use crate::runtime::runtime::Scheduler; 1333 1334 let (scheduler, handle, blocking_pool) = 1335 self.build_current_thread_runtime_components(None)?; 1336 1337 Ok(Runtime::from_parts( 1338 Scheduler::CurrentThread(scheduler), 1339 handle, 1340 blocking_pool, 1341 )) 1342 } 1343 1344 #[cfg(tokio_unstable)] build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime>1345 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> { 1346 use crate::runtime::local_runtime::LocalRuntimeScheduler; 1347 1348 let tid = std::thread::current().id(); 1349 1350 let (scheduler, handle, blocking_pool) = 1351 self.build_current_thread_runtime_components(Some(tid))?; 1352 1353 Ok(LocalRuntime::from_parts( 1354 LocalRuntimeScheduler::CurrentThread(scheduler), 1355 handle, 1356 blocking_pool, 1357 )) 1358 } 1359 build_current_thread_runtime_components( &mut self, local_tid: Option<ThreadId>, ) -> io::Result<(CurrentThread, Handle, BlockingPool)>1360 fn build_current_thread_runtime_components( 1361 &mut self, 1362 local_tid: Option<ThreadId>, 1363 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> { 1364 use crate::runtime::scheduler; 1365 use crate::runtime::Config; 1366 1367 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?; 1368 1369 // Blocking pool 1370 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); 1371 let blocking_spawner = blocking_pool.spawner().clone(); 1372 1373 // Generate a rng seed for this runtime. 1374 let seed_generator_1 = self.seed_generator.next_generator(); 1375 let seed_generator_2 = self.seed_generator.next_generator(); 1376 1377 // And now put a single-threaded scheduler on top of the timer. When 1378 // there are no futures ready to do something, it'll let the timer or 1379 // the reactor to generate some new stimuli for the futures to continue 1380 // in their life. 1381 let (scheduler, handle) = CurrentThread::new( 1382 driver, 1383 driver_handle, 1384 blocking_spawner, 1385 seed_generator_2, 1386 Config { 1387 before_park: self.before_park.clone(), 1388 after_unpark: self.after_unpark.clone(), 1389 before_spawn: self.before_spawn.clone(), 1390 after_termination: self.after_termination.clone(), 1391 global_queue_interval: self.global_queue_interval, 1392 event_interval: self.event_interval, 1393 local_queue_capacity: self.local_queue_capacity, 1394 #[cfg(tokio_unstable)] 1395 unhandled_panic: self.unhandled_panic.clone(), 1396 disable_lifo_slot: self.disable_lifo_slot, 1397 seed_generator: seed_generator_1, 1398 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), 1399 }, 1400 local_tid, 1401 ); 1402 1403 let handle = Handle { 1404 inner: scheduler::Handle::CurrentThread(handle), 1405 }; 1406 1407 Ok((scheduler, handle, blocking_pool)) 1408 } 1409 metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder>1410 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> { 1411 if self.metrics_poll_count_histogram_enable { 1412 Some(self.metrics_poll_count_histogram.clone()) 1413 } else { 1414 None 1415 } 1416 } 1417 } 1418 1419 cfg_io_driver! { 1420 impl Builder { 1421 /// Enables the I/O driver. 1422 /// 1423 /// Doing this enables using net, process, signal, and some I/O types on 1424 /// the runtime. 1425 /// 1426 /// # Examples 1427 /// 1428 /// ``` 1429 /// use tokio::runtime; 1430 /// 1431 /// let rt = runtime::Builder::new_multi_thread() 1432 /// .enable_io() 1433 /// .build() 1434 /// .unwrap(); 1435 /// ``` 1436 pub fn enable_io(&mut self) -> &mut Self { 1437 self.enable_io = true; 1438 self 1439 } 1440 1441 /// Enables the I/O driver and configures the max number of events to be 1442 /// processed per tick. 1443 /// 1444 /// # Examples 1445 /// 1446 /// ``` 1447 /// use tokio::runtime; 1448 /// 1449 /// let rt = runtime::Builder::new_current_thread() 1450 /// .enable_io() 1451 /// .max_io_events_per_tick(1024) 1452 /// .build() 1453 /// .unwrap(); 1454 /// ``` 1455 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self { 1456 self.nevents = capacity; 1457 self 1458 } 1459 } 1460 } 1461 1462 cfg_time! { 1463 impl Builder { 1464 /// Enables the time driver. 1465 /// 1466 /// Doing this enables using `tokio::time` on the runtime. 1467 /// 1468 /// # Examples 1469 /// 1470 /// ``` 1471 /// use tokio::runtime; 1472 /// 1473 /// let rt = runtime::Builder::new_multi_thread() 1474 /// .enable_time() 1475 /// .build() 1476 /// .unwrap(); 1477 /// ``` 1478 pub fn enable_time(&mut self) -> &mut Self { 1479 self.enable_time = true; 1480 self 1481 } 1482 } 1483 } 1484 1485 cfg_test_util! { 1486 impl Builder { 1487 /// Controls if the runtime's clock starts paused or advancing. 1488 /// 1489 /// Pausing time requires the current-thread runtime; construction of 1490 /// the runtime will panic otherwise. 1491 /// 1492 /// # Examples 1493 /// 1494 /// ``` 1495 /// use tokio::runtime; 1496 /// 1497 /// let rt = runtime::Builder::new_current_thread() 1498 /// .enable_time() 1499 /// .start_paused(true) 1500 /// .build() 1501 /// .unwrap(); 1502 /// ``` 1503 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self { 1504 self.start_paused = start_paused; 1505 self 1506 } 1507 } 1508 } 1509 1510 cfg_rt_multi_thread! { 1511 impl Builder { 1512 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { 1513 use crate::loom::sys::num_cpus; 1514 use crate::runtime::{Config, runtime::Scheduler}; 1515 use crate::runtime::scheduler::{self, MultiThread}; 1516 1517 let core_threads = self.worker_threads.unwrap_or_else(num_cpus); 1518 1519 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?; 1520 1521 // Create the blocking pool 1522 let blocking_pool = 1523 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); 1524 let blocking_spawner = blocking_pool.spawner().clone(); 1525 1526 // Generate a rng seed for this runtime. 1527 let seed_generator_1 = self.seed_generator.next_generator(); 1528 let seed_generator_2 = self.seed_generator.next_generator(); 1529 1530 let (scheduler, handle, launch) = MultiThread::new( 1531 core_threads, 1532 driver, 1533 driver_handle, 1534 blocking_spawner, 1535 seed_generator_2, 1536 Config { 1537 before_park: self.before_park.clone(), 1538 after_unpark: self.after_unpark.clone(), 1539 before_spawn: self.before_spawn.clone(), 1540 after_termination: self.after_termination.clone(), 1541 global_queue_interval: self.global_queue_interval, 1542 event_interval: self.event_interval, 1543 local_queue_capacity: self.local_queue_capacity, 1544 #[cfg(tokio_unstable)] 1545 unhandled_panic: self.unhandled_panic.clone(), 1546 disable_lifo_slot: self.disable_lifo_slot, 1547 seed_generator: seed_generator_1, 1548 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), 1549 }, 1550 ); 1551 1552 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) }; 1553 1554 // Spawn the thread pool workers 1555 let _enter = handle.enter(); 1556 launch.launch(); 1557 1558 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool)) 1559 } 1560 1561 cfg_unstable! { 1562 fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> { 1563 use crate::loom::sys::num_cpus; 1564 use crate::runtime::{Config, runtime::Scheduler}; 1565 use crate::runtime::scheduler::MultiThreadAlt; 1566 1567 let core_threads = self.worker_threads.unwrap_or_else(num_cpus); 1568 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?; 1569 1570 // Create the blocking pool 1571 let blocking_pool = 1572 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); 1573 let blocking_spawner = blocking_pool.spawner().clone(); 1574 1575 // Generate a rng seed for this runtime. 1576 let seed_generator_1 = self.seed_generator.next_generator(); 1577 let seed_generator_2 = self.seed_generator.next_generator(); 1578 1579 let (scheduler, handle) = MultiThreadAlt::new( 1580 core_threads, 1581 driver, 1582 driver_handle, 1583 blocking_spawner, 1584 seed_generator_2, 1585 Config { 1586 before_park: self.before_park.clone(), 1587 after_unpark: self.after_unpark.clone(), 1588 before_spawn: self.before_spawn.clone(), 1589 after_termination: self.after_termination.clone(), 1590 global_queue_interval: self.global_queue_interval, 1591 event_interval: self.event_interval, 1592 local_queue_capacity: self.local_queue_capacity, 1593 #[cfg(tokio_unstable)] 1594 unhandled_panic: self.unhandled_panic.clone(), 1595 disable_lifo_slot: self.disable_lifo_slot, 1596 seed_generator: seed_generator_1, 1597 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), 1598 }, 1599 ); 1600 1601 Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool)) 1602 } 1603 } 1604 } 1605 } 1606 1607 impl fmt::Debug for Builder { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1608 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 1609 fmt.debug_struct("Builder") 1610 .field("worker_threads", &self.worker_threads) 1611 .field("max_blocking_threads", &self.max_blocking_threads) 1612 .field( 1613 "thread_name", 1614 &"<dyn Fn() -> String + Send + Sync + 'static>", 1615 ) 1616 .field("thread_stack_size", &self.thread_stack_size) 1617 .field("after_start", &self.after_start.as_ref().map(|_| "...")) 1618 .field("before_stop", &self.before_stop.as_ref().map(|_| "...")) 1619 .field("before_park", &self.before_park.as_ref().map(|_| "...")) 1620 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "...")) 1621 .finish() 1622 } 1623 } 1624