1 use crate::loom::thread::AccessError;
2 use crate::runtime::coop;
3
4 use std::cell::Cell;
5
6 #[cfg(any(feature = "rt", feature = "macros", feature = "time"))]
7 use crate::util::rand::FastRand;
8
9 cfg_rt! {
10 mod blocking;
11 pub(crate) use blocking::{disallow_block_in_place, try_enter_blocking_region, BlockingRegionGuard};
12
13 mod current;
14 pub(crate) use current::{with_current, try_set_current, SetCurrentGuard};
15
16 mod runtime;
17 pub(crate) use runtime::{EnterRuntime, enter_runtime};
18
19 mod scoped;
20 use scoped::Scoped;
21
22 use crate::runtime::{scheduler, task::Id};
23
24 use std::task::Waker;
25
26 cfg_taskdump! {
27 use crate::runtime::task::trace;
28 }
29 }
30
31 cfg_rt_multi_thread! {
32 mod runtime_mt;
33 pub(crate) use runtime_mt::{current_enter_context, exit_runtime};
34 }
35
36 struct Context {
37 /// Uniquely identifies the current thread
38 #[cfg(feature = "rt")]
39 thread_id: Cell<Option<ThreadId>>,
40
41 /// Handle to the runtime scheduler running on the current thread.
42 #[cfg(feature = "rt")]
43 current: current::HandleCell,
44
45 /// Handle to the scheduler's internal "context"
46 #[cfg(feature = "rt")]
47 scheduler: Scoped<scheduler::Context>,
48
49 #[cfg(feature = "rt")]
50 current_task_id: Cell<Option<Id>>,
51
52 /// Tracks if the current thread is currently driving a runtime.
53 /// Note, that if this is set to "entered", the current scheduler
54 /// handle may not reference the runtime currently executing. This
55 /// is because other runtime handles may be set to current from
56 /// within a runtime.
57 #[cfg(feature = "rt")]
58 runtime: Cell<EnterRuntime>,
59
60 #[cfg(any(feature = "rt", feature = "macros", feature = "time"))]
61 rng: Cell<Option<FastRand>>,
62
63 /// Tracks the amount of "work" a task may still do before yielding back to
64 /// the scheduler
65 budget: Cell<coop::Budget>,
66
67 #[cfg(all(
68 tokio_unstable,
69 tokio_taskdump,
70 feature = "rt",
71 target_os = "linux",
72 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
73 ))]
74 trace: trace::Context,
75 }
76
77 tokio_thread_local! {
78 static CONTEXT: Context = const {
79 Context {
80 #[cfg(feature = "rt")]
81 thread_id: Cell::new(None),
82
83 // Tracks the current runtime handle to use when spawning,
84 // accessing drivers, etc...
85 #[cfg(feature = "rt")]
86 current: current::HandleCell::new(),
87
88 // Tracks the current scheduler internal context
89 #[cfg(feature = "rt")]
90 scheduler: Scoped::new(),
91
92 #[cfg(feature = "rt")]
93 current_task_id: Cell::new(None),
94
95 // Tracks if the current thread is currently driving a runtime.
96 // Note, that if this is set to "entered", the current scheduler
97 // handle may not reference the runtime currently executing. This
98 // is because other runtime handles may be set to current from
99 // within a runtime.
100 #[cfg(feature = "rt")]
101 runtime: Cell::new(EnterRuntime::NotEntered),
102
103 #[cfg(any(feature = "rt", feature = "macros", feature = "time"))]
104 rng: Cell::new(None),
105
106 budget: Cell::new(coop::Budget::unconstrained()),
107
108 #[cfg(all(
109 tokio_unstable,
110 tokio_taskdump,
111 feature = "rt",
112 target_os = "linux",
113 any(
114 target_arch = "aarch64",
115 target_arch = "x86",
116 target_arch = "x86_64"
117 )
118 ))]
119 trace: trace::Context::new(),
120 }
121 }
122 }
123
124 #[cfg(any(
125 feature = "time",
126 feature = "macros",
127 all(feature = "sync", feature = "rt")
128 ))]
thread_rng_n(n: u32) -> u32129 pub(crate) fn thread_rng_n(n: u32) -> u32 {
130 CONTEXT.with(|ctx| {
131 let mut rng = ctx.rng.get().unwrap_or_else(FastRand::new);
132 let ret = rng.fastrand_n(n);
133 ctx.rng.set(Some(rng));
134 ret
135 })
136 }
137
budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, AccessError>138 pub(super) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, AccessError> {
139 CONTEXT.try_with(|ctx| f(&ctx.budget))
140 }
141
142 cfg_rt! {
143 use crate::runtime::ThreadId;
144
145 pub(crate) fn thread_id() -> Result<ThreadId, AccessError> {
146 CONTEXT.try_with(|ctx| {
147 match ctx.thread_id.get() {
148 Some(id) => id,
149 None => {
150 let id = ThreadId::next();
151 ctx.thread_id.set(Some(id));
152 id
153 }
154 }
155 })
156 }
157
158 pub(crate) fn set_current_task_id(id: Option<Id>) -> Option<Id> {
159 CONTEXT.try_with(|ctx| ctx.current_task_id.replace(id)).unwrap_or(None)
160 }
161
162 pub(crate) fn current_task_id() -> Option<Id> {
163 CONTEXT.try_with(|ctx| ctx.current_task_id.get()).unwrap_or(None)
164 }
165
166 #[track_caller]
167 pub(crate) fn defer(waker: &Waker) {
168 with_scheduler(|maybe_scheduler| {
169 if let Some(scheduler) = maybe_scheduler {
170 scheduler.defer(waker);
171 } else {
172 // Called from outside of the runtime, immediately wake the
173 // task.
174 waker.wake_by_ref();
175 }
176 });
177 }
178
179 pub(super) fn set_scheduler<R>(v: &scheduler::Context, f: impl FnOnce() -> R) -> R {
180 CONTEXT.with(|c| c.scheduler.set(v, f))
181 }
182
183 #[track_caller]
184 pub(super) fn with_scheduler<R>(f: impl FnOnce(Option<&scheduler::Context>) -> R) -> R {
185 let mut f = Some(f);
186 CONTEXT.try_with(|c| {
187 let f = f.take().unwrap();
188 if matches!(c.runtime.get(), EnterRuntime::Entered { .. }) {
189 c.scheduler.with(f)
190 } else {
191 f(None)
192 }
193 })
194 .unwrap_or_else(|_| (f.take().unwrap())(None))
195 }
196
197 cfg_taskdump! {
198 /// SAFETY: Callers of this function must ensure that trace frames always
199 /// form a valid linked list.
200 pub(crate) unsafe fn with_trace<R>(f: impl FnOnce(&trace::Context) -> R) -> Option<R> {
201 CONTEXT.try_with(|c| f(&c.trace)).ok()
202 }
203 }
204 }
205