1 #![cfg_attr(loom, allow(unused_imports))]
2 
3 use crate::runtime::handle::Handle;
4 use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5 #[cfg(tokio_unstable)]
6 use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7 use crate::util::rand::{RngSeed, RngSeedGenerator};
8 
9 use crate::runtime::blocking::BlockingPool;
10 use crate::runtime::scheduler::CurrentThread;
11 use std::fmt;
12 use std::io;
13 use std::thread::ThreadId;
14 use std::time::Duration;
15 
16 /// Builds Tokio Runtime with custom configuration values.
17 ///
18 /// Methods can be chained in order to set the configuration values. The
19 /// Runtime is constructed by calling [`build`].
20 ///
21 /// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22 /// or [`Builder::new_current_thread`].
23 ///
24 /// See function level documentation for details on the various configuration
25 /// settings.
26 ///
27 /// [`build`]: method@Self::build
28 /// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29 /// [`Builder::new_current_thread`]: method@Self::new_current_thread
30 ///
31 /// # Examples
32 ///
33 /// ```
34 /// use tokio::runtime::Builder;
35 ///
36 /// fn main() {
37 ///     // build runtime
38 ///     let runtime = Builder::new_multi_thread()
39 ///         .worker_threads(4)
40 ///         .thread_name("my-custom-name")
41 ///         .thread_stack_size(3 * 1024 * 1024)
42 ///         .build()
43 ///         .unwrap();
44 ///
45 ///     // use runtime ...
46 /// }
47 /// ```
48 pub struct Builder {
49     /// Runtime type
50     kind: Kind,
51 
52     /// Whether or not to enable the I/O driver
53     enable_io: bool,
54     nevents: usize,
55 
56     /// Whether or not to enable the time driver
57     enable_time: bool,
58 
59     /// Whether or not the clock should start paused.
60     start_paused: bool,
61 
62     /// The number of worker threads, used by Runtime.
63     ///
64     /// Only used when not using the current-thread executor.
65     worker_threads: Option<usize>,
66 
67     /// Cap on thread usage.
68     max_blocking_threads: usize,
69 
70     /// Name fn used for threads spawned by the runtime.
71     pub(super) thread_name: ThreadNameFn,
72 
73     /// Stack size used for threads spawned by the runtime.
74     pub(super) thread_stack_size: Option<usize>,
75 
76     /// Callback to run after each thread starts.
77     pub(super) after_start: Option<Callback>,
78 
79     /// To run before each worker thread stops
80     pub(super) before_stop: Option<Callback>,
81 
82     /// To run before each worker thread is parked.
83     pub(super) before_park: Option<Callback>,
84 
85     /// To run after each thread is unparked.
86     pub(super) after_unpark: Option<Callback>,
87 
88     /// To run before each task is spawned.
89     pub(super) before_spawn: Option<TaskCallback>,
90 
91     /// To run after each task is terminated.
92     pub(super) after_termination: Option<TaskCallback>,
93 
94     /// Customizable keep alive timeout for `BlockingPool`
95     pub(super) keep_alive: Option<Duration>,
96 
97     /// How many ticks before pulling a task from the global/remote queue?
98     ///
99     /// When `None`, the value is unspecified and behavior details are left to
100     /// the scheduler. Each scheduler flavor could choose to either pick its own
101     /// default value or use some other strategy to decide when to poll from the
102     /// global queue. For example, the multi-threaded scheduler uses a
103     /// self-tuning strategy based on mean task poll times.
104     pub(super) global_queue_interval: Option<u32>,
105 
106     /// How many ticks before yielding to the driver for timer and I/O events?
107     pub(super) event_interval: u32,
108 
109     pub(super) local_queue_capacity: usize,
110 
111     /// When true, the multi-threade scheduler LIFO slot should not be used.
112     ///
113     /// This option should only be exposed as unstable.
114     pub(super) disable_lifo_slot: bool,
115 
116     /// Specify a random number generator seed to provide deterministic results
117     pub(super) seed_generator: RngSeedGenerator,
118 
119     /// When true, enables task poll count histogram instrumentation.
120     pub(super) metrics_poll_count_histogram_enable: bool,
121 
122     /// Configures the task poll count histogram
123     pub(super) metrics_poll_count_histogram: HistogramBuilder,
124 
125     #[cfg(tokio_unstable)]
126     pub(super) unhandled_panic: UnhandledPanic,
127 }
128 
129 cfg_unstable! {
130     /// How the runtime should respond to unhandled panics.
131     ///
132     /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
133     /// to configure the runtime behavior when a spawned task panics.
134     ///
135     /// See [`Builder::unhandled_panic`] for more details.
136     #[derive(Debug, Clone)]
137     #[non_exhaustive]
138     pub enum UnhandledPanic {
139         /// The runtime should ignore panics on spawned tasks.
140         ///
141         /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
142         /// tasks continue running normally.
143         ///
144         /// This is the default behavior.
145         ///
146         /// # Examples
147         ///
148         /// ```
149         /// use tokio::runtime::{self, UnhandledPanic};
150         ///
151         /// # pub fn main() {
152         /// let rt = runtime::Builder::new_current_thread()
153         ///     .unhandled_panic(UnhandledPanic::Ignore)
154         ///     .build()
155         ///     .unwrap();
156         ///
157         /// let task1 = rt.spawn(async { panic!("boom"); });
158         /// let task2 = rt.spawn(async {
159         ///     // This task completes normally
160         ///     "done"
161         /// });
162         ///
163         /// rt.block_on(async {
164         ///     // The panic on the first task is forwarded to the `JoinHandle`
165         ///     assert!(task1.await.is_err());
166         ///
167         ///     // The second task completes normally
168         ///     assert!(task2.await.is_ok());
169         /// })
170         /// # }
171         /// ```
172         ///
173         /// [`JoinHandle`]: struct@crate::task::JoinHandle
174         Ignore,
175 
176         /// The runtime should immediately shutdown if a spawned task panics.
177         ///
178         /// The runtime will immediately shutdown even if the panicked task's
179         /// [`JoinHandle`] is still available. All further spawned tasks will be
180         /// immediately dropped and call to [`Runtime::block_on`] will panic.
181         ///
182         /// # Examples
183         ///
184         /// ```should_panic
185         /// use tokio::runtime::{self, UnhandledPanic};
186         ///
187         /// # pub fn main() {
188         /// let rt = runtime::Builder::new_current_thread()
189         ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
190         ///     .build()
191         ///     .unwrap();
192         ///
193         /// rt.spawn(async { panic!("boom"); });
194         /// rt.spawn(async {
195         ///     // This task never completes.
196         /// });
197         ///
198         /// rt.block_on(async {
199         ///     // Do some work
200         /// # loop { tokio::task::yield_now().await; }
201         /// })
202         /// # }
203         /// ```
204         ///
205         /// [`JoinHandle`]: struct@crate::task::JoinHandle
206         ShutdownRuntime,
207     }
208 }
209 
210 pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
211 
212 #[derive(Clone, Copy)]
213 pub(crate) enum Kind {
214     CurrentThread,
215     #[cfg(feature = "rt-multi-thread")]
216     MultiThread,
217     #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
218     MultiThreadAlt,
219 }
220 
221 impl Builder {
222     /// Returns a new builder with the current thread scheduler selected.
223     ///
224     /// Configuration methods can be chained on the return value.
225     ///
226     /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
227     /// [`LocalSet`].
228     ///
229     /// [`LocalSet`]: crate::task::LocalSet
new_current_thread() -> Builder230     pub fn new_current_thread() -> Builder {
231         #[cfg(loom)]
232         const EVENT_INTERVAL: u32 = 4;
233         // The number `61` is fairly arbitrary. I believe this value was copied from golang.
234         #[cfg(not(loom))]
235         const EVENT_INTERVAL: u32 = 61;
236 
237         Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
238     }
239 
240     /// Returns a new builder with the multi thread scheduler selected.
241     ///
242     /// Configuration methods can be chained on the return value.
243     #[cfg(feature = "rt-multi-thread")]
244     #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
new_multi_thread() -> Builder245     pub fn new_multi_thread() -> Builder {
246         // The number `61` is fairly arbitrary. I believe this value was copied from golang.
247         Builder::new(Kind::MultiThread, 61)
248     }
249 
250     cfg_unstable! {
251         /// Returns a new builder with the alternate multi thread scheduler
252         /// selected.
253         ///
254         /// The alternate multi threaded scheduler is an in-progress
255         /// candidate to replace the existing multi threaded scheduler. It
256         /// currently does not scale as well to 16+ processors.
257         ///
258         /// This runtime flavor is currently **not considered production
259         /// ready**.
260         ///
261         /// Configuration methods can be chained on the return value.
262         #[cfg(feature = "rt-multi-thread")]
263         #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
264         pub fn new_multi_thread_alt() -> Builder {
265             // The number `61` is fairly arbitrary. I believe this value was copied from golang.
266             Builder::new(Kind::MultiThreadAlt, 61)
267         }
268     }
269 
270     /// Returns a new runtime builder initialized with default configuration
271     /// values.
272     ///
273     /// Configuration methods can be chained on the return value.
new(kind: Kind, event_interval: u32) -> Builder274     pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
275         Builder {
276             kind,
277 
278             // I/O defaults to "off"
279             enable_io: false,
280             nevents: 1024,
281 
282             // Time defaults to "off"
283             enable_time: false,
284 
285             // The clock starts not-paused
286             start_paused: false,
287 
288             // Read from environment variable first in multi-threaded mode.
289             // Default to lazy auto-detection (one thread per CPU core)
290             worker_threads: None,
291 
292             max_blocking_threads: 512,
293 
294             // Default thread name
295             thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
296 
297             // Do not set a stack size by default
298             thread_stack_size: None,
299 
300             // No worker thread callbacks
301             after_start: None,
302             before_stop: None,
303             before_park: None,
304             after_unpark: None,
305 
306             before_spawn: None,
307             after_termination: None,
308 
309             keep_alive: None,
310 
311             // Defaults for these values depend on the scheduler kind, so we get them
312             // as parameters.
313             global_queue_interval: None,
314             event_interval,
315 
316             #[cfg(not(loom))]
317             local_queue_capacity: 256,
318 
319             #[cfg(loom)]
320             local_queue_capacity: 4,
321 
322             seed_generator: RngSeedGenerator::new(RngSeed::new()),
323 
324             #[cfg(tokio_unstable)]
325             unhandled_panic: UnhandledPanic::Ignore,
326 
327             metrics_poll_count_histogram_enable: false,
328 
329             metrics_poll_count_histogram: HistogramBuilder::default(),
330 
331             disable_lifo_slot: false,
332         }
333     }
334 
335     /// Enables both I/O and time drivers.
336     ///
337     /// Doing this is a shorthand for calling `enable_io` and `enable_time`
338     /// individually. If additional components are added to Tokio in the future,
339     /// `enable_all` will include these future components.
340     ///
341     /// # Examples
342     ///
343     /// ```
344     /// use tokio::runtime;
345     ///
346     /// let rt = runtime::Builder::new_multi_thread()
347     ///     .enable_all()
348     ///     .build()
349     ///     .unwrap();
350     /// ```
enable_all(&mut self) -> &mut Self351     pub fn enable_all(&mut self) -> &mut Self {
352         #[cfg(any(
353             feature = "net",
354             all(unix, feature = "process"),
355             all(unix, feature = "signal")
356         ))]
357         self.enable_io();
358         #[cfg(feature = "time")]
359         self.enable_time();
360 
361         self
362     }
363 
364     /// Sets the number of worker threads the `Runtime` will use.
365     ///
366     /// This can be any number above 0 though it is advised to keep this value
367     /// on the smaller side.
368     ///
369     /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
370     ///
371     /// # Default
372     ///
373     /// The default value is the number of cores available to the system.
374     ///
375     /// When using the `current_thread` runtime this method has no effect.
376     ///
377     /// # Examples
378     ///
379     /// ## Multi threaded runtime with 4 threads
380     ///
381     /// ```
382     /// use tokio::runtime;
383     ///
384     /// // This will spawn a work-stealing runtime with 4 worker threads.
385     /// let rt = runtime::Builder::new_multi_thread()
386     ///     .worker_threads(4)
387     ///     .build()
388     ///     .unwrap();
389     ///
390     /// rt.spawn(async move {});
391     /// ```
392     ///
393     /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
394     ///
395     /// ```
396     /// use tokio::runtime;
397     ///
398     /// // Create a runtime that _must_ be driven from a call
399     /// // to `Runtime::block_on`.
400     /// let rt = runtime::Builder::new_current_thread()
401     ///     .build()
402     ///     .unwrap();
403     ///
404     /// // This will run the runtime and future on the current thread
405     /// rt.block_on(async move {});
406     /// ```
407     ///
408     /// # Panics
409     ///
410     /// This will panic if `val` is not larger than `0`.
411     #[track_caller]
worker_threads(&mut self, val: usize) -> &mut Self412     pub fn worker_threads(&mut self, val: usize) -> &mut Self {
413         assert!(val > 0, "Worker threads cannot be set to 0");
414         self.worker_threads = Some(val);
415         self
416     }
417 
418     /// Specifies the limit for additional threads spawned by the Runtime.
419     ///
420     /// These threads are used for blocking operations like tasks spawned
421     /// through [`spawn_blocking`], this includes but is not limited to:
422     /// - [`fs`] operations
423     /// - dns resolution through [`ToSocketAddrs`]
424     /// - writing to [`Stdout`] or [`Stderr`]
425     /// - reading from [`Stdin`]
426     ///
427     /// Unlike the [`worker_threads`], they are not always active and will exit
428     /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
429     ///
430     /// It's recommended to not set this limit too low in order to avoid hanging on operations
431     /// requiring [`spawn_blocking`].
432     ///
433     /// The default value is 512.
434     ///
435     /// # Panics
436     ///
437     /// This will panic if `val` is not larger than `0`.
438     ///
439     /// # Upgrading from 0.x
440     ///
441     /// In old versions `max_threads` limited both blocking and worker threads, but the
442     /// current `max_blocking_threads` does not include async worker threads in the count.
443     ///
444     /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
445     /// [`fs`]: mod@crate::fs
446     /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
447     /// [`Stdout`]: struct@crate::io::Stdout
448     /// [`Stdin`]: struct@crate::io::Stdin
449     /// [`Stderr`]: struct@crate::io::Stderr
450     /// [`worker_threads`]: Self::worker_threads
451     /// [`thread_keep_alive`]: Self::thread_keep_alive
452     #[track_caller]
453     #[cfg_attr(docsrs, doc(alias = "max_threads"))]
max_blocking_threads(&mut self, val: usize) -> &mut Self454     pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
455         assert!(val > 0, "Max blocking threads cannot be set to 0");
456         self.max_blocking_threads = val;
457         self
458     }
459 
460     /// Sets name of threads spawned by the `Runtime`'s thread pool.
461     ///
462     /// The default name is "tokio-runtime-worker".
463     ///
464     /// # Examples
465     ///
466     /// ```
467     /// # use tokio::runtime;
468     ///
469     /// # pub fn main() {
470     /// let rt = runtime::Builder::new_multi_thread()
471     ///     .thread_name("my-pool")
472     ///     .build();
473     /// # }
474     /// ```
thread_name(&mut self, val: impl Into<String>) -> &mut Self475     pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
476         let val = val.into();
477         self.thread_name = std::sync::Arc::new(move || val.clone());
478         self
479     }
480 
481     /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
482     ///
483     /// The default name fn is `|| "tokio-runtime-worker".into()`.
484     ///
485     /// # Examples
486     ///
487     /// ```
488     /// # use tokio::runtime;
489     /// # use std::sync::atomic::{AtomicUsize, Ordering};
490     /// # pub fn main() {
491     /// let rt = runtime::Builder::new_multi_thread()
492     ///     .thread_name_fn(|| {
493     ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
494     ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
495     ///        format!("my-pool-{}", id)
496     ///     })
497     ///     .build();
498     /// # }
499     /// ```
thread_name_fn<F>(&mut self, f: F) -> &mut Self where F: Fn() -> String + Send + Sync + 'static,500     pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
501     where
502         F: Fn() -> String + Send + Sync + 'static,
503     {
504         self.thread_name = std::sync::Arc::new(f);
505         self
506     }
507 
508     /// Sets the stack size (in bytes) for worker threads.
509     ///
510     /// The actual stack size may be greater than this value if the platform
511     /// specifies minimal stack size.
512     ///
513     /// The default stack size for spawned threads is 2 MiB, though this
514     /// particular stack size is subject to change in the future.
515     ///
516     /// # Examples
517     ///
518     /// ```
519     /// # use tokio::runtime;
520     ///
521     /// # pub fn main() {
522     /// let rt = runtime::Builder::new_multi_thread()
523     ///     .thread_stack_size(32 * 1024)
524     ///     .build();
525     /// # }
526     /// ```
thread_stack_size(&mut self, val: usize) -> &mut Self527     pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
528         self.thread_stack_size = Some(val);
529         self
530     }
531 
532     /// Executes function `f` after each thread is started but before it starts
533     /// doing work.
534     ///
535     /// This is intended for bookkeeping and monitoring use cases.
536     ///
537     /// # Examples
538     ///
539     /// ```
540     /// # use tokio::runtime;
541     /// # pub fn main() {
542     /// let runtime = runtime::Builder::new_multi_thread()
543     ///     .on_thread_start(|| {
544     ///         println!("thread started");
545     ///     })
546     ///     .build();
547     /// # }
548     /// ```
549     #[cfg(not(loom))]
on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,550     pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
551     where
552         F: Fn() + Send + Sync + 'static,
553     {
554         self.after_start = Some(std::sync::Arc::new(f));
555         self
556     }
557 
558     /// Executes function `f` before each thread stops.
559     ///
560     /// This is intended for bookkeeping and monitoring use cases.
561     ///
562     /// # Examples
563     ///
564     /// ```
565     /// # use tokio::runtime;
566     /// # pub fn main() {
567     /// let runtime = runtime::Builder::new_multi_thread()
568     ///     .on_thread_stop(|| {
569     ///         println!("thread stopping");
570     ///     })
571     ///     .build();
572     /// # }
573     /// ```
574     #[cfg(not(loom))]
on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,575     pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
576     where
577         F: Fn() + Send + Sync + 'static,
578     {
579         self.before_stop = Some(std::sync::Arc::new(f));
580         self
581     }
582 
583     /// Executes function `f` just before a thread is parked (goes idle).
584     /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
585     /// can be called, and may result in this thread being unparked immediately.
586     ///
587     /// This can be used to start work only when the executor is idle, or for bookkeeping
588     /// and monitoring purposes.
589     ///
590     /// Note: There can only be one park callback for a runtime; calling this function
591     /// more than once replaces the last callback defined, rather than adding to it.
592     ///
593     /// # Examples
594     ///
595     /// ## Multithreaded executor
596     /// ```
597     /// # use std::sync::Arc;
598     /// # use std::sync::atomic::{AtomicBool, Ordering};
599     /// # use tokio::runtime;
600     /// # use tokio::sync::Barrier;
601     /// # pub fn main() {
602     /// let once = AtomicBool::new(true);
603     /// let barrier = Arc::new(Barrier::new(2));
604     ///
605     /// let runtime = runtime::Builder::new_multi_thread()
606     ///     .worker_threads(1)
607     ///     .on_thread_park({
608     ///         let barrier = barrier.clone();
609     ///         move || {
610     ///             let barrier = barrier.clone();
611     ///             if once.swap(false, Ordering::Relaxed) {
612     ///                 tokio::spawn(async move { barrier.wait().await; });
613     ///            }
614     ///         }
615     ///     })
616     ///     .build()
617     ///     .unwrap();
618     ///
619     /// runtime.block_on(async {
620     ///    barrier.wait().await;
621     /// })
622     /// # }
623     /// ```
624     /// ## Current thread executor
625     /// ```
626     /// # use std::sync::Arc;
627     /// # use std::sync::atomic::{AtomicBool, Ordering};
628     /// # use tokio::runtime;
629     /// # use tokio::sync::Barrier;
630     /// # pub fn main() {
631     /// let once = AtomicBool::new(true);
632     /// let barrier = Arc::new(Barrier::new(2));
633     ///
634     /// let runtime = runtime::Builder::new_current_thread()
635     ///     .on_thread_park({
636     ///         let barrier = barrier.clone();
637     ///         move || {
638     ///             let barrier = barrier.clone();
639     ///             if once.swap(false, Ordering::Relaxed) {
640     ///                 tokio::spawn(async move { barrier.wait().await; });
641     ///            }
642     ///         }
643     ///     })
644     ///     .build()
645     ///     .unwrap();
646     ///
647     /// runtime.block_on(async {
648     ///    barrier.wait().await;
649     /// })
650     /// # }
651     /// ```
652     #[cfg(not(loom))]
on_thread_park<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,653     pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
654     where
655         F: Fn() + Send + Sync + 'static,
656     {
657         self.before_park = Some(std::sync::Arc::new(f));
658         self
659     }
660 
661     /// Executes function `f` just after a thread unparks (starts executing tasks).
662     ///
663     /// This is intended for bookkeeping and monitoring use cases; note that work
664     /// in this callback will increase latencies when the application has allowed one or
665     /// more runtime threads to go idle.
666     ///
667     /// Note: There can only be one unpark callback for a runtime; calling this function
668     /// more than once replaces the last callback defined, rather than adding to it.
669     ///
670     /// # Examples
671     ///
672     /// ```
673     /// # use tokio::runtime;
674     /// # pub fn main() {
675     /// let runtime = runtime::Builder::new_multi_thread()
676     ///     .on_thread_unpark(|| {
677     ///         println!("thread unparking");
678     ///     })
679     ///     .build();
680     ///
681     /// runtime.unwrap().block_on(async {
682     ///    tokio::task::yield_now().await;
683     ///    println!("Hello from Tokio!");
684     /// })
685     /// # }
686     /// ```
687     #[cfg(not(loom))]
on_thread_unpark<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,688     pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
689     where
690         F: Fn() + Send + Sync + 'static,
691     {
692         self.after_unpark = Some(std::sync::Arc::new(f));
693         self
694     }
695 
696     /// Executes function `f` just before a task is spawned.
697     ///
698     /// `f` is called within the Tokio context, so functions like
699     /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
700     /// invoked immediately.
701     ///
702     /// This can be used for bookkeeping or monitoring purposes.
703     ///
704     /// Note: There can only be one spawn callback for a runtime; calling this function more
705     /// than once replaces the last callback defined, rather than adding to it.
706     ///
707     /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
708     ///
709     /// **Note**: This is an [unstable API][unstable]. The public API of this type
710     /// may break in 1.x releases. See [the documentation on unstable
711     /// features][unstable] for details.
712     ///
713     /// [unstable]: crate#unstable-features
714     ///
715     /// # Examples
716     ///
717     /// ```
718     /// # use tokio::runtime;
719     /// # pub fn main() {
720     /// let runtime = runtime::Builder::new_current_thread()
721     ///     .on_task_spawn(|_| {
722     ///         println!("spawning task");
723     ///     })
724     ///     .build()
725     ///     .unwrap();
726     ///
727     /// runtime.block_on(async {
728     ///     tokio::task::spawn(std::future::ready(()));
729     ///
730     ///     for _ in 0..64 {
731     ///         tokio::task::yield_now().await;
732     ///     }
733     /// })
734     /// # }
735     /// ```
736     #[cfg(all(not(loom), tokio_unstable))]
737     #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
on_task_spawn<F>(&mut self, f: F) -> &mut Self where F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,738     pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
739     where
740         F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
741     {
742         self.before_spawn = Some(std::sync::Arc::new(f));
743         self
744     }
745 
746     /// Executes function `f` just after a task is terminated.
747     ///
748     /// `f` is called within the Tokio context, so functions like
749     /// [`tokio::spawn`](crate::spawn) can be called.
750     ///
751     /// This can be used for bookkeeping or monitoring purposes.
752     ///
753     /// Note: There can only be one task termination callback for a runtime; calling this
754     /// function more than once replaces the last callback defined, rather than adding to it.
755     ///
756     /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
757     ///
758     /// **Note**: This is an [unstable API][unstable]. The public API of this type
759     /// may break in 1.x releases. See [the documentation on unstable
760     /// features][unstable] for details.
761     ///
762     /// [unstable]: crate#unstable-features
763     ///
764     /// # Examples
765     ///
766     /// ```
767     /// # use tokio::runtime;
768     /// # pub fn main() {
769     /// let runtime = runtime::Builder::new_current_thread()
770     ///     .on_task_terminate(|_| {
771     ///         println!("killing task");
772     ///     })
773     ///     .build()
774     ///     .unwrap();
775     ///
776     /// runtime.block_on(async {
777     ///     tokio::task::spawn(std::future::ready(()));
778     ///
779     ///     for _ in 0..64 {
780     ///         tokio::task::yield_now().await;
781     ///     }
782     /// })
783     /// # }
784     /// ```
785     #[cfg(all(not(loom), tokio_unstable))]
786     #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
on_task_terminate<F>(&mut self, f: F) -> &mut Self where F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,787     pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
788     where
789         F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
790     {
791         self.after_termination = Some(std::sync::Arc::new(f));
792         self
793     }
794 
795     /// Creates the configured `Runtime`.
796     ///
797     /// The returned `Runtime` instance is ready to spawn tasks.
798     ///
799     /// # Examples
800     ///
801     /// ```
802     /// use tokio::runtime::Builder;
803     ///
804     /// let rt  = Builder::new_multi_thread().build().unwrap();
805     ///
806     /// rt.block_on(async {
807     ///     println!("Hello from the Tokio runtime");
808     /// });
809     /// ```
build(&mut self) -> io::Result<Runtime>810     pub fn build(&mut self) -> io::Result<Runtime> {
811         match &self.kind {
812             Kind::CurrentThread => self.build_current_thread_runtime(),
813             #[cfg(feature = "rt-multi-thread")]
814             Kind::MultiThread => self.build_threaded_runtime(),
815             #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
816             Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
817         }
818     }
819 
820     /// Creates the configured `LocalRuntime`.
821     ///
822     /// The returned `LocalRuntime` instance is ready to spawn tasks.
823     ///
824     /// # Panics
825     /// This will panic if `current_thread` is not the selected runtime flavor.
826     /// All other runtime flavors are unsupported by [`LocalRuntime`].
827     ///
828     /// [`LocalRuntime`]: [crate::runtime::LocalRuntime]
829     ///
830     /// # Examples
831     ///
832     /// ```
833     /// use tokio::runtime::Builder;
834     ///
835     /// let rt  = Builder::new_current_thread().build_local(&mut Default::default()).unwrap();
836     ///
837     /// rt.block_on(async {
838     ///     println!("Hello from the Tokio runtime");
839     /// });
840     /// ```
841     #[allow(unused_variables, unreachable_patterns)]
842     #[cfg(tokio_unstable)]
843     #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime>844     pub fn build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime> {
845         match &self.kind {
846             Kind::CurrentThread => self.build_current_thread_local_runtime(),
847             _ => panic!("Only current_thread is supported when building a local runtime"),
848         }
849     }
850 
get_cfg(&self, workers: usize) -> driver::Cfg851     fn get_cfg(&self, workers: usize) -> driver::Cfg {
852         driver::Cfg {
853             enable_pause_time: match self.kind {
854                 Kind::CurrentThread => true,
855                 #[cfg(feature = "rt-multi-thread")]
856                 Kind::MultiThread => false,
857                 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
858                 Kind::MultiThreadAlt => false,
859             },
860             enable_io: self.enable_io,
861             enable_time: self.enable_time,
862             start_paused: self.start_paused,
863             nevents: self.nevents,
864             workers,
865         }
866     }
867 
868     /// Sets a custom timeout for a thread in the blocking pool.
869     ///
870     /// By default, the timeout for a thread is set to 10 seconds. This can
871     /// be overridden using `.thread_keep_alive()`.
872     ///
873     /// # Example
874     ///
875     /// ```
876     /// # use tokio::runtime;
877     /// # use std::time::Duration;
878     /// # pub fn main() {
879     /// let rt = runtime::Builder::new_multi_thread()
880     ///     .thread_keep_alive(Duration::from_millis(100))
881     ///     .build();
882     /// # }
883     /// ```
thread_keep_alive(&mut self, duration: Duration) -> &mut Self884     pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
885         self.keep_alive = Some(duration);
886         self
887     }
888 
889     /// Sets the number of scheduler ticks after which the scheduler will poll the global
890     /// task queue.
891     ///
892     /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
893     ///
894     /// By default the global queue interval is 31 for the current-thread scheduler. Please see
895     /// [the module documentation] for the default behavior of the multi-thread scheduler.
896     ///
897     /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
898     /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
899     /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
900     /// getting started on new work, especially if tasks frequently yield rather than complete
901     /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
902     /// is a good choice when most tasks quickly complete polling.
903     ///
904     /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
905     ///
906     /// # Panics
907     ///
908     /// This function will panic if 0 is passed as an argument.
909     ///
910     /// # Examples
911     ///
912     /// ```
913     /// # use tokio::runtime;
914     /// # pub fn main() {
915     /// let rt = runtime::Builder::new_multi_thread()
916     ///     .global_queue_interval(31)
917     ///     .build();
918     /// # }
919     /// ```
920     #[track_caller]
global_queue_interval(&mut self, val: u32) -> &mut Self921     pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
922         assert!(val > 0, "global_queue_interval must be greater than 0");
923         self.global_queue_interval = Some(val);
924         self
925     }
926 
927     /// Sets the number of scheduler ticks after which the scheduler will poll for
928     /// external events (timers, I/O, and so on).
929     ///
930     /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
931     ///
932     /// By default, the event interval is `61` for all scheduler types.
933     ///
934     /// Setting the event interval determines the effective "priority" of delivering
935     /// these external events (which may wake up additional tasks), compared to
936     /// executing tasks that are currently ready to run. A smaller value is useful
937     /// when tasks frequently spend a long time in polling, or frequently yield,
938     /// which can result in overly long delays picking up I/O events. Conversely,
939     /// picking up new events requires extra synchronization and syscall overhead,
940     /// so if tasks generally complete their polling quickly, a higher event interval
941     /// will minimize that overhead while still keeping the scheduler responsive to
942     /// events.
943     ///
944     /// # Examples
945     ///
946     /// ```
947     /// # use tokio::runtime;
948     /// # pub fn main() {
949     /// let rt = runtime::Builder::new_multi_thread()
950     ///     .event_interval(31)
951     ///     .build();
952     /// # }
953     /// ```
event_interval(&mut self, val: u32) -> &mut Self954     pub fn event_interval(&mut self, val: u32) -> &mut Self {
955         self.event_interval = val;
956         self
957     }
958 
959     cfg_unstable! {
960         /// Configure how the runtime responds to an unhandled panic on a
961         /// spawned task.
962         ///
963         /// By default, an unhandled panic (i.e. a panic not caught by
964         /// [`std::panic::catch_unwind`]) has no impact on the runtime's
965         /// execution. The panic's error value is forwarded to the task's
966         /// [`JoinHandle`] and all other spawned tasks continue running.
967         ///
968         /// The `unhandled_panic` option enables configuring this behavior.
969         ///
970         /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
971         ///   spawned tasks have no impact on the runtime's execution.
972         /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
973         ///   shutdown immediately when a spawned task panics even if that
974         ///   task's `JoinHandle` has not been dropped. All other spawned tasks
975         ///   will immediately terminate and further calls to
976         ///   [`Runtime::block_on`] will panic.
977         ///
978         /// # Panics
979         /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
980         /// on a runtime other than the current thread runtime.
981         ///
982         /// # Unstable
983         ///
984         /// This option is currently unstable and its implementation is
985         /// incomplete. The API may change or be removed in the future. See
986         /// issue [tokio-rs/tokio#4516] for more details.
987         ///
988         /// # Examples
989         ///
990         /// The following demonstrates a runtime configured to shutdown on
991         /// panic. The first spawned task panics and results in the runtime
992         /// shutting down. The second spawned task never has a chance to
993         /// execute. The call to `block_on` will panic due to the runtime being
994         /// forcibly shutdown.
995         ///
996         /// ```should_panic
997         /// use tokio::runtime::{self, UnhandledPanic};
998         ///
999         /// # pub fn main() {
1000         /// let rt = runtime::Builder::new_current_thread()
1001         ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1002         ///     .build()
1003         ///     .unwrap();
1004         ///
1005         /// rt.spawn(async { panic!("boom"); });
1006         /// rt.spawn(async {
1007         ///     // This task never completes.
1008         /// });
1009         ///
1010         /// rt.block_on(async {
1011         ///     // Do some work
1012         /// # loop { tokio::task::yield_now().await; }
1013         /// })
1014         /// # }
1015         /// ```
1016         ///
1017         /// [`JoinHandle`]: struct@crate::task::JoinHandle
1018         /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1019         pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1020             if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1021                 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1022             }
1023 
1024             self.unhandled_panic = behavior;
1025             self
1026         }
1027 
1028         /// Disables the LIFO task scheduler heuristic.
1029         ///
1030         /// The multi-threaded scheduler includes a heuristic for optimizing
1031         /// message-passing patterns. This heuristic results in the **last**
1032         /// scheduled task being polled first.
1033         ///
1034         /// To implement this heuristic, each worker thread has a slot which
1035         /// holds the task that should be polled next. However, this slot cannot
1036         /// be stolen by other worker threads, which can result in lower total
1037         /// throughput when tasks tend to have longer poll times.
1038         ///
1039         /// This configuration option will disable this heuristic resulting in
1040         /// all scheduled tasks being pushed into the worker-local queue, which
1041         /// is stealable.
1042         ///
1043         /// Consider trying this option when the task "scheduled" time is high
1044         /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1045         /// collect this data.
1046         ///
1047         /// # Unstable
1048         ///
1049         /// This configuration option is considered a workaround for the LIFO
1050         /// slot not being stealable. When the slot becomes stealable, we will
1051         /// revisit whether or not this option is necessary. See
1052         /// issue [tokio-rs/tokio#4941].
1053         ///
1054         /// # Examples
1055         ///
1056         /// ```
1057         /// use tokio::runtime;
1058         ///
1059         /// let rt = runtime::Builder::new_multi_thread()
1060         ///     .disable_lifo_slot()
1061         ///     .build()
1062         ///     .unwrap();
1063         /// ```
1064         ///
1065         /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1066         /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1067         pub fn disable_lifo_slot(&mut self) -> &mut Self {
1068             self.disable_lifo_slot = true;
1069             self
1070         }
1071 
1072         /// Specifies the random number generation seed to use within all
1073         /// threads associated with the runtime being built.
1074         ///
1075         /// This option is intended to make certain parts of the runtime
1076         /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1077         /// [`tokio::select!`] it will ensure that the order that branches are
1078         /// polled is deterministic.
1079         ///
1080         /// In addition to the code specifying `rng_seed` and interacting with
1081         /// the runtime, the internals of Tokio and the Rust compiler may affect
1082         /// the sequences of random numbers. In order to ensure repeatable
1083         /// results, the version of Tokio, the versions of all other
1084         /// dependencies that interact with Tokio, and the Rust compiler version
1085         /// should also all remain constant.
1086         ///
1087         /// # Examples
1088         ///
1089         /// ```
1090         /// # use tokio::runtime::{self, RngSeed};
1091         /// # pub fn main() {
1092         /// let seed = RngSeed::from_bytes(b"place your seed here");
1093         /// let rt = runtime::Builder::new_current_thread()
1094         ///     .rng_seed(seed)
1095         ///     .build();
1096         /// # }
1097         /// ```
1098         ///
1099         /// [`tokio::select!`]: crate::select
1100         pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1101             self.seed_generator = RngSeedGenerator::new(seed);
1102             self
1103         }
1104     }
1105 
1106     cfg_unstable_metrics! {
1107         /// Enables tracking the distribution of task poll times.
1108         ///
1109         /// Task poll times are not instrumented by default as doing so requires
1110         /// calling [`Instant::now()`] twice per task poll, which could add
1111         /// measurable overhead. Use the [`Handle::metrics()`] to access the
1112         /// metrics data.
1113         ///
1114         /// The histogram uses fixed bucket sizes. In other words, the histogram
1115         /// buckets are not dynamic based on input values. Use the
1116         /// `metrics_poll_time_histogram` builder methods to configure the
1117         /// histogram details.
1118         ///
1119         /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1120         /// This has an extremely low memory footprint, but may not provide enough granularity. For
1121         /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1122         /// to select [`LogHistogram`] instead.
1123         ///
1124         /// # Examples
1125         ///
1126         /// ```
1127         /// use tokio::runtime;
1128         ///
1129         /// let rt = runtime::Builder::new_multi_thread()
1130         ///     .enable_metrics_poll_time_histogram()
1131         ///     .build()
1132         ///     .unwrap();
1133         /// # // Test default values here
1134         /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1135         /// # let m = rt.handle().metrics();
1136         /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1137         /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1138         /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1139         /// ```
1140         ///
1141         /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1142         /// [`Instant::now()`]: std::time::Instant::now
1143         /// [`LogHistogram`]: crate::runtime::LogHistogram
1144         /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1145         pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1146             self.metrics_poll_count_histogram_enable = true;
1147             self
1148         }
1149 
1150         /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1151         ///
1152         /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1153         #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1154         #[doc(hidden)]
1155         pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1156             self.enable_metrics_poll_time_histogram()
1157         }
1158 
1159         /// Sets the histogram scale for tracking the distribution of task poll
1160         /// times.
1161         ///
1162         /// Tracking the distribution of task poll times can be done using a
1163         /// linear or log scale. When using linear scale, each histogram bucket
1164         /// will represent the same range of poll times. When using log scale,
1165         /// each histogram bucket will cover a range twice as big as the
1166         /// previous bucket.
1167         ///
1168         /// **Default:** linear scale.
1169         ///
1170         /// # Examples
1171         ///
1172         /// ```
1173         /// use tokio::runtime::{self, HistogramScale};
1174         ///
1175         /// # #[allow(deprecated)]
1176         /// let rt = runtime::Builder::new_multi_thread()
1177         ///     .enable_metrics_poll_time_histogram()
1178         ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1179         ///     .build()
1180         ///     .unwrap();
1181         /// ```
1182         #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1183         pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1184             self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1185             self
1186         }
1187 
1188         /// Configure the histogram for tracking poll times
1189         ///
1190         /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1191         /// This has an extremely low memory footprint, but may not provide enough granularity. For
1192         /// better granularity with low memory usage, use [`LogHistogram`] instead.
1193         ///
1194         /// # Examples
1195         /// Configure a [`LogHistogram`] with [default configuration]:
1196         /// ```
1197         /// use tokio::runtime;
1198         /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1199         ///
1200         /// let rt = runtime::Builder::new_multi_thread()
1201         ///     .enable_metrics_poll_time_histogram()
1202         ///     .metrics_poll_time_histogram_configuration(
1203         ///         HistogramConfiguration::log(LogHistogram::default())
1204         ///     )
1205         ///     .build()
1206         ///     .unwrap();
1207         /// ```
1208         ///
1209         /// Configure a linear histogram with 100 buckets, each 10μs wide
1210         /// ```
1211         /// use tokio::runtime;
1212         /// use std::time::Duration;
1213         /// use tokio::runtime::HistogramConfiguration;
1214         ///
1215         /// let rt = runtime::Builder::new_multi_thread()
1216         ///     .enable_metrics_poll_time_histogram()
1217         ///     .metrics_poll_time_histogram_configuration(
1218         ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1219         ///     )
1220         ///     .build()
1221         ///     .unwrap();
1222         /// ```
1223         ///
1224         /// Configure a [`LogHistogram`] with the following settings:
1225         /// - Measure times from 100ns to 120s
1226         /// - Max error of 0.1
1227         /// - No more than 1024 buckets
1228         /// ```
1229         /// use std::time::Duration;
1230         /// use tokio::runtime;
1231         /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1232         ///
1233         /// let rt = runtime::Builder::new_multi_thread()
1234         ///     .enable_metrics_poll_time_histogram()
1235         ///     .metrics_poll_time_histogram_configuration(
1236         ///         HistogramConfiguration::log(LogHistogram::builder()
1237         ///             .max_value(Duration::from_secs(120))
1238         ///             .min_value(Duration::from_nanos(100))
1239         ///             .max_error(0.1)
1240         ///             .max_buckets(1024)
1241         ///             .expect("configuration uses 488 buckets")
1242         ///         )
1243         ///     )
1244         ///     .build()
1245         ///     .unwrap();
1246         /// ```
1247         ///
1248         /// [`LogHistogram`]: crate::runtime::LogHistogram
1249         /// [default configuration]: crate::runtime::LogHistogramBuilder
1250         pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1251             self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1252             self
1253         }
1254 
1255         /// Sets the histogram resolution for tracking the distribution of task
1256         /// poll times.
1257         ///
1258         /// The resolution is the histogram's first bucket's range. When using a
1259         /// linear histogram scale, each bucket will cover the same range. When
1260         /// using a log scale, each bucket will cover a range twice as big as
1261         /// the previous bucket. In the log case, the resolution represents the
1262         /// smallest bucket range.
1263         ///
1264         /// Note that, when using log scale, the resolution is rounded up to the
1265         /// nearest power of 2 in nanoseconds.
1266         ///
1267         /// **Default:** 100 microseconds.
1268         ///
1269         /// # Examples
1270         ///
1271         /// ```
1272         /// use tokio::runtime;
1273         /// use std::time::Duration;
1274         ///
1275         /// # #[allow(deprecated)]
1276         /// let rt = runtime::Builder::new_multi_thread()
1277         ///     .enable_metrics_poll_time_histogram()
1278         ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1279         ///     .build()
1280         ///     .unwrap();
1281         /// ```
1282         #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1283         pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1284             assert!(resolution > Duration::from_secs(0));
1285             // Sanity check the argument and also make the cast below safe.
1286             assert!(resolution <= Duration::from_secs(1));
1287 
1288             let resolution = resolution.as_nanos() as u64;
1289 
1290             self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1291             self
1292         }
1293 
1294         /// Sets the number of buckets for the histogram tracking the
1295         /// distribution of task poll times.
1296         ///
1297         /// The last bucket tracks all greater values that fall out of other
1298         /// ranges. So, configuring the histogram using a linear scale,
1299         /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1300         /// polls that take more than 450ms to complete.
1301         ///
1302         /// **Default:** 10
1303         ///
1304         /// # Examples
1305         ///
1306         /// ```
1307         /// use tokio::runtime;
1308         ///
1309         /// # #[allow(deprecated)]
1310         /// let rt = runtime::Builder::new_multi_thread()
1311         ///     .enable_metrics_poll_time_histogram()
1312         ///     .metrics_poll_count_histogram_buckets(15)
1313         ///     .build()
1314         ///     .unwrap();
1315         /// ```
1316         #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1317         pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1318             self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1319             self
1320         }
1321     }
1322 
1323     cfg_loom! {
1324         pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
1325             assert!(value.is_power_of_two());
1326             self.local_queue_capacity = value;
1327             self
1328         }
1329     }
1330 
build_current_thread_runtime(&mut self) -> io::Result<Runtime>1331     fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1332         use crate::runtime::runtime::Scheduler;
1333 
1334         let (scheduler, handle, blocking_pool) =
1335             self.build_current_thread_runtime_components(None)?;
1336 
1337         Ok(Runtime::from_parts(
1338             Scheduler::CurrentThread(scheduler),
1339             handle,
1340             blocking_pool,
1341         ))
1342     }
1343 
1344     #[cfg(tokio_unstable)]
build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime>1345     fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1346         use crate::runtime::local_runtime::LocalRuntimeScheduler;
1347 
1348         let tid = std::thread::current().id();
1349 
1350         let (scheduler, handle, blocking_pool) =
1351             self.build_current_thread_runtime_components(Some(tid))?;
1352 
1353         Ok(LocalRuntime::from_parts(
1354             LocalRuntimeScheduler::CurrentThread(scheduler),
1355             handle,
1356             blocking_pool,
1357         ))
1358     }
1359 
build_current_thread_runtime_components( &mut self, local_tid: Option<ThreadId>, ) -> io::Result<(CurrentThread, Handle, BlockingPool)>1360     fn build_current_thread_runtime_components(
1361         &mut self,
1362         local_tid: Option<ThreadId>,
1363     ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1364         use crate::runtime::scheduler;
1365         use crate::runtime::Config;
1366 
1367         let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?;
1368 
1369         // Blocking pool
1370         let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1371         let blocking_spawner = blocking_pool.spawner().clone();
1372 
1373         // Generate a rng seed for this runtime.
1374         let seed_generator_1 = self.seed_generator.next_generator();
1375         let seed_generator_2 = self.seed_generator.next_generator();
1376 
1377         // And now put a single-threaded scheduler on top of the timer. When
1378         // there are no futures ready to do something, it'll let the timer or
1379         // the reactor to generate some new stimuli for the futures to continue
1380         // in their life.
1381         let (scheduler, handle) = CurrentThread::new(
1382             driver,
1383             driver_handle,
1384             blocking_spawner,
1385             seed_generator_2,
1386             Config {
1387                 before_park: self.before_park.clone(),
1388                 after_unpark: self.after_unpark.clone(),
1389                 before_spawn: self.before_spawn.clone(),
1390                 after_termination: self.after_termination.clone(),
1391                 global_queue_interval: self.global_queue_interval,
1392                 event_interval: self.event_interval,
1393                 local_queue_capacity: self.local_queue_capacity,
1394                 #[cfg(tokio_unstable)]
1395                 unhandled_panic: self.unhandled_panic.clone(),
1396                 disable_lifo_slot: self.disable_lifo_slot,
1397                 seed_generator: seed_generator_1,
1398                 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1399             },
1400             local_tid,
1401         );
1402 
1403         let handle = Handle {
1404             inner: scheduler::Handle::CurrentThread(handle),
1405         };
1406 
1407         Ok((scheduler, handle, blocking_pool))
1408     }
1409 
metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder>1410     fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1411         if self.metrics_poll_count_histogram_enable {
1412             Some(self.metrics_poll_count_histogram.clone())
1413         } else {
1414             None
1415         }
1416     }
1417 }
1418 
1419 cfg_io_driver! {
1420     impl Builder {
1421         /// Enables the I/O driver.
1422         ///
1423         /// Doing this enables using net, process, signal, and some I/O types on
1424         /// the runtime.
1425         ///
1426         /// # Examples
1427         ///
1428         /// ```
1429         /// use tokio::runtime;
1430         ///
1431         /// let rt = runtime::Builder::new_multi_thread()
1432         ///     .enable_io()
1433         ///     .build()
1434         ///     .unwrap();
1435         /// ```
1436         pub fn enable_io(&mut self) -> &mut Self {
1437             self.enable_io = true;
1438             self
1439         }
1440 
1441         /// Enables the I/O driver and configures the max number of events to be
1442         /// processed per tick.
1443         ///
1444         /// # Examples
1445         ///
1446         /// ```
1447         /// use tokio::runtime;
1448         ///
1449         /// let rt = runtime::Builder::new_current_thread()
1450         ///     .enable_io()
1451         ///     .max_io_events_per_tick(1024)
1452         ///     .build()
1453         ///     .unwrap();
1454         /// ```
1455         pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1456             self.nevents = capacity;
1457             self
1458         }
1459     }
1460 }
1461 
1462 cfg_time! {
1463     impl Builder {
1464         /// Enables the time driver.
1465         ///
1466         /// Doing this enables using `tokio::time` on the runtime.
1467         ///
1468         /// # Examples
1469         ///
1470         /// ```
1471         /// use tokio::runtime;
1472         ///
1473         /// let rt = runtime::Builder::new_multi_thread()
1474         ///     .enable_time()
1475         ///     .build()
1476         ///     .unwrap();
1477         /// ```
1478         pub fn enable_time(&mut self) -> &mut Self {
1479             self.enable_time = true;
1480             self
1481         }
1482     }
1483 }
1484 
1485 cfg_test_util! {
1486     impl Builder {
1487         /// Controls if the runtime's clock starts paused or advancing.
1488         ///
1489         /// Pausing time requires the current-thread runtime; construction of
1490         /// the runtime will panic otherwise.
1491         ///
1492         /// # Examples
1493         ///
1494         /// ```
1495         /// use tokio::runtime;
1496         ///
1497         /// let rt = runtime::Builder::new_current_thread()
1498         ///     .enable_time()
1499         ///     .start_paused(true)
1500         ///     .build()
1501         ///     .unwrap();
1502         /// ```
1503         pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1504             self.start_paused = start_paused;
1505             self
1506         }
1507     }
1508 }
1509 
1510 cfg_rt_multi_thread! {
1511     impl Builder {
1512         fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1513             use crate::loom::sys::num_cpus;
1514             use crate::runtime::{Config, runtime::Scheduler};
1515             use crate::runtime::scheduler::{self, MultiThread};
1516 
1517             let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1518 
1519             let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
1520 
1521             // Create the blocking pool
1522             let blocking_pool =
1523                 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1524             let blocking_spawner = blocking_pool.spawner().clone();
1525 
1526             // Generate a rng seed for this runtime.
1527             let seed_generator_1 = self.seed_generator.next_generator();
1528             let seed_generator_2 = self.seed_generator.next_generator();
1529 
1530             let (scheduler, handle, launch) = MultiThread::new(
1531                 core_threads,
1532                 driver,
1533                 driver_handle,
1534                 blocking_spawner,
1535                 seed_generator_2,
1536                 Config {
1537                     before_park: self.before_park.clone(),
1538                     after_unpark: self.after_unpark.clone(),
1539                     before_spawn: self.before_spawn.clone(),
1540                     after_termination: self.after_termination.clone(),
1541                     global_queue_interval: self.global_queue_interval,
1542                     event_interval: self.event_interval,
1543                     local_queue_capacity: self.local_queue_capacity,
1544                     #[cfg(tokio_unstable)]
1545                     unhandled_panic: self.unhandled_panic.clone(),
1546                     disable_lifo_slot: self.disable_lifo_slot,
1547                     seed_generator: seed_generator_1,
1548                     metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1549                 },
1550             );
1551 
1552             let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1553 
1554             // Spawn the thread pool workers
1555             let _enter = handle.enter();
1556             launch.launch();
1557 
1558             Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1559         }
1560 
1561         cfg_unstable! {
1562             fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
1563                 use crate::loom::sys::num_cpus;
1564                 use crate::runtime::{Config, runtime::Scheduler};
1565                 use crate::runtime::scheduler::MultiThreadAlt;
1566 
1567                 let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1568                 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
1569 
1570                 // Create the blocking pool
1571                 let blocking_pool =
1572                     blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1573                 let blocking_spawner = blocking_pool.spawner().clone();
1574 
1575                 // Generate a rng seed for this runtime.
1576                 let seed_generator_1 = self.seed_generator.next_generator();
1577                 let seed_generator_2 = self.seed_generator.next_generator();
1578 
1579                 let (scheduler, handle) = MultiThreadAlt::new(
1580                     core_threads,
1581                     driver,
1582                     driver_handle,
1583                     blocking_spawner,
1584                     seed_generator_2,
1585                     Config {
1586                         before_park: self.before_park.clone(),
1587                         after_unpark: self.after_unpark.clone(),
1588                         before_spawn: self.before_spawn.clone(),
1589                         after_termination: self.after_termination.clone(),
1590                         global_queue_interval: self.global_queue_interval,
1591                         event_interval: self.event_interval,
1592                         local_queue_capacity: self.local_queue_capacity,
1593                         #[cfg(tokio_unstable)]
1594                         unhandled_panic: self.unhandled_panic.clone(),
1595                         disable_lifo_slot: self.disable_lifo_slot,
1596                         seed_generator: seed_generator_1,
1597                         metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1598                     },
1599                 );
1600 
1601                 Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
1602             }
1603         }
1604     }
1605 }
1606 
1607 impl fmt::Debug for Builder {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1608     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1609         fmt.debug_struct("Builder")
1610             .field("worker_threads", &self.worker_threads)
1611             .field("max_blocking_threads", &self.max_blocking_threads)
1612             .field(
1613                 "thread_name",
1614                 &"<dyn Fn() -> String + Send + Sync + 'static>",
1615             )
1616             .field("thread_stack_size", &self.thread_stack_size)
1617             .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1618             .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1619             .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1620             .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1621             .finish()
1622     }
1623 }
1624