1 use crate::loom::sync::Arc;
2 use crate::runtime::context;
3 use crate::runtime::scheduler::{self, current_thread, Inject};
4 use crate::task::Id;
5
6 use backtrace::BacktraceFrame;
7 use std::cell::Cell;
8 use std::collections::VecDeque;
9 use std::ffi::c_void;
10 use std::fmt;
11 use std::future::Future;
12 use std::pin::Pin;
13 use std::ptr::{self, NonNull};
14 use std::task::{self, Poll};
15
16 mod symbol;
17 mod tree;
18
19 use symbol::Symbol;
20 use tree::Tree;
21
22 use super::{Notified, OwnedTasks, Schedule};
23
24 type Backtrace = Vec<BacktraceFrame>;
25 type SymbolTrace = Vec<Symbol>;
26
27 /// The ambiant backtracing context.
28 pub(crate) struct Context {
29 /// The address of [`Trace::root`] establishes an upper unwinding bound on
30 /// the backtraces in `Trace`.
31 active_frame: Cell<Option<NonNull<Frame>>>,
32 /// The place to stash backtraces.
33 collector: Cell<Option<Trace>>,
34 }
35
36 /// A [`Frame`] in an intrusive, doubly-linked tree of [`Frame`]s.
37 struct Frame {
38 /// The location associated with this frame.
39 inner_addr: *const c_void,
40
41 /// The parent frame, if any.
42 parent: Option<NonNull<Frame>>,
43 }
44
45 /// An tree execution trace.
46 ///
47 /// Traces are captured with [`Trace::capture`], rooted with [`Trace::root`]
48 /// and leaved with [`trace_leaf`].
49 #[derive(Clone, Debug)]
50 pub(crate) struct Trace {
51 // The linear backtraces that comprise this trace. These linear traces can
52 // be re-knitted into a tree.
53 backtraces: Vec<Backtrace>,
54 }
55
56 pin_project_lite::pin_project! {
57 #[derive(Debug, Clone)]
58 #[must_use = "futures do nothing unless you `.await` or poll them"]
59 pub(crate) struct Root<T> {
60 #[pin]
61 future: T,
62 }
63 }
64
65 const FAIL_NO_THREAD_LOCAL: &str = "The Tokio thread-local has been destroyed \
66 as part of shutting down the current \
67 thread, so collecting a taskdump is not \
68 possible.";
69
70 impl Context {
new() -> Self71 pub(crate) const fn new() -> Self {
72 Context {
73 active_frame: Cell::new(None),
74 collector: Cell::new(None),
75 }
76 }
77
78 /// SAFETY: Callers of this function must ensure that trace frames always
79 /// form a valid linked list.
try_with_current<F, R>(f: F) -> Option<R> where F: FnOnce(&Self) -> R,80 unsafe fn try_with_current<F, R>(f: F) -> Option<R>
81 where
82 F: FnOnce(&Self) -> R,
83 {
84 crate::runtime::context::with_trace(f)
85 }
86
with_current_frame<F, R>(f: F) -> R where F: FnOnce(&Cell<Option<NonNull<Frame>>>) -> R,87 unsafe fn with_current_frame<F, R>(f: F) -> R
88 where
89 F: FnOnce(&Cell<Option<NonNull<Frame>>>) -> R,
90 {
91 Self::try_with_current(|context| f(&context.active_frame)).expect(FAIL_NO_THREAD_LOCAL)
92 }
93
with_current_collector<F, R>(f: F) -> R where F: FnOnce(&Cell<Option<Trace>>) -> R,94 fn with_current_collector<F, R>(f: F) -> R
95 where
96 F: FnOnce(&Cell<Option<Trace>>) -> R,
97 {
98 // SAFETY: This call can only access the collector field, so it cannot
99 // break the trace frame linked list.
100 unsafe {
101 Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL)
102 }
103 }
104
105 /// Produces `true` if the current task is being traced; otherwise false.
is_tracing() -> bool106 pub(crate) fn is_tracing() -> bool {
107 Self::with_current_collector(|maybe_collector| {
108 let collector = maybe_collector.take();
109 let result = collector.is_some();
110 maybe_collector.set(collector);
111 result
112 })
113 }
114 }
115
116 impl Trace {
117 /// Invokes `f`, returning both its result and the collection of backtraces
118 /// captured at each sub-invocation of [`trace_leaf`].
119 #[inline(never)]
capture<F, R>(f: F) -> (R, Trace) where F: FnOnce() -> R,120 pub(crate) fn capture<F, R>(f: F) -> (R, Trace)
121 where
122 F: FnOnce() -> R,
123 {
124 let collector = Trace { backtraces: vec![] };
125
126 let previous = Context::with_current_collector(|current| current.replace(Some(collector)));
127
128 let result = f();
129
130 let collector =
131 Context::with_current_collector(|current| current.replace(previous)).unwrap();
132
133 (result, collector)
134 }
135
136 /// The root of a trace.
137 #[inline(never)]
root<F>(future: F) -> Root<F>138 pub(crate) fn root<F>(future: F) -> Root<F> {
139 Root { future }
140 }
141 }
142
143 /// If this is a sub-invocation of [`Trace::capture`], capture a backtrace.
144 ///
145 /// The captured backtrace will be returned by [`Trace::capture`].
146 ///
147 /// Invoking this function does nothing when it is not a sub-invocation
148 /// [`Trace::capture`].
149 // This function is marked `#[inline(never)]` to ensure that it gets a distinct `Frame` in the
150 // backtrace, below which frames should not be included in the backtrace (since they reflect the
151 // internal implementation details of this crate).
152 #[inline(never)]
trace_leaf(cx: &mut task::Context<'_>) -> Poll<()>153 pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
154 // Safety: We don't manipulate the current context's active frame.
155 let did_trace = unsafe {
156 Context::try_with_current(|context_cell| {
157 if let Some(mut collector) = context_cell.collector.take() {
158 let mut frames = vec![];
159 let mut above_leaf = false;
160
161 if let Some(active_frame) = context_cell.active_frame.get() {
162 let active_frame = active_frame.as_ref();
163
164 backtrace::trace(|frame| {
165 let below_root = !ptr::eq(frame.symbol_address(), active_frame.inner_addr);
166
167 // only capture frames above `Trace::leaf` and below
168 // `Trace::root`.
169 if above_leaf && below_root {
170 frames.push(frame.to_owned().into());
171 }
172
173 if ptr::eq(frame.symbol_address(), trace_leaf as *const _) {
174 above_leaf = true;
175 }
176
177 // only continue unwinding if we're below `Trace::root`
178 below_root
179 });
180 }
181 collector.backtraces.push(frames);
182 context_cell.collector.set(Some(collector));
183 true
184 } else {
185 false
186 }
187 })
188 .unwrap_or(false)
189 };
190
191 if did_trace {
192 // Use the same logic that `yield_now` uses to send out wakeups after
193 // the task yields.
194 context::with_scheduler(|scheduler| {
195 if let Some(scheduler) = scheduler {
196 match scheduler {
197 scheduler::Context::CurrentThread(s) => s.defer.defer(cx.waker()),
198 #[cfg(feature = "rt-multi-thread")]
199 scheduler::Context::MultiThread(s) => s.defer.defer(cx.waker()),
200 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
201 scheduler::Context::MultiThreadAlt(_) => unimplemented!(),
202 }
203 }
204 });
205
206 Poll::Pending
207 } else {
208 Poll::Ready(())
209 }
210 }
211
212 impl fmt::Display for Trace {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result213 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214 Tree::from_trace(self.clone()).fmt(f)
215 }
216 }
217
defer<F: FnOnce() -> R, R>(f: F) -> impl Drop218 fn defer<F: FnOnce() -> R, R>(f: F) -> impl Drop {
219 use std::mem::ManuallyDrop;
220
221 struct Defer<F: FnOnce() -> R, R>(ManuallyDrop<F>);
222
223 impl<F: FnOnce() -> R, R> Drop for Defer<F, R> {
224 #[inline(always)]
225 fn drop(&mut self) {
226 unsafe {
227 ManuallyDrop::take(&mut self.0)();
228 }
229 }
230 }
231
232 Defer(ManuallyDrop::new(f))
233 }
234
235 impl<T: Future> Future for Root<T> {
236 type Output = T::Output;
237
238 #[inline(never)]
poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>239 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
240 // SAFETY: The context's current frame is restored to its original state
241 // before `frame` is dropped.
242 unsafe {
243 let mut frame = Frame {
244 inner_addr: Self::poll as *const c_void,
245 parent: None,
246 };
247
248 Context::with_current_frame(|current| {
249 frame.parent = current.take();
250 current.set(Some(NonNull::from(&frame)));
251 });
252
253 let _restore = defer(|| {
254 Context::with_current_frame(|current| {
255 current.set(frame.parent);
256 });
257 });
258
259 let this = self.project();
260 this.future.poll(cx)
261 }
262 }
263 }
264
265 /// Trace and poll all tasks of the `current_thread` runtime.
trace_current_thread( owned: &OwnedTasks<Arc<current_thread::Handle>>, local: &mut VecDeque<Notified<Arc<current_thread::Handle>>>, injection: &Inject<Arc<current_thread::Handle>>, ) -> Vec<(Id, Trace)>266 pub(in crate::runtime) fn trace_current_thread(
267 owned: &OwnedTasks<Arc<current_thread::Handle>>,
268 local: &mut VecDeque<Notified<Arc<current_thread::Handle>>>,
269 injection: &Inject<Arc<current_thread::Handle>>,
270 ) -> Vec<(Id, Trace)> {
271 // clear the local and injection queues
272
273 let mut dequeued = Vec::new();
274
275 while let Some(task) = local.pop_back() {
276 dequeued.push(task);
277 }
278
279 while let Some(task) = injection.pop() {
280 dequeued.push(task);
281 }
282
283 // precondition: We have drained the tasks from the injection queue.
284 trace_owned(owned, dequeued)
285 }
286
287 cfg_rt_multi_thread! {
288 use crate::loom::sync::Mutex;
289 use crate::runtime::scheduler::multi_thread;
290 use crate::runtime::scheduler::multi_thread::Synced;
291 use crate::runtime::scheduler::inject::Shared;
292
293 /// Trace and poll all tasks of the `current_thread` runtime.
294 ///
295 /// ## Safety
296 ///
297 /// Must be called with the same `synced` that `injection` was created with.
298 pub(in crate::runtime) unsafe fn trace_multi_thread(
299 owned: &OwnedTasks<Arc<multi_thread::Handle>>,
300 local: &mut multi_thread::queue::Local<Arc<multi_thread::Handle>>,
301 synced: &Mutex<Synced>,
302 injection: &Shared<Arc<multi_thread::Handle>>,
303 ) -> Vec<(Id, Trace)> {
304 let mut dequeued = Vec::new();
305
306 // clear the local queue
307 while let Some(notified) = local.pop() {
308 dequeued.push(notified);
309 }
310
311 // clear the injection queue
312 let mut synced = synced.lock();
313 while let Some(notified) = injection.pop(&mut synced.inject) {
314 dequeued.push(notified);
315 }
316
317 drop(synced);
318
319 // precondition: we have drained the tasks from the local and injection
320 // queues.
321 trace_owned(owned, dequeued)
322 }
323 }
324
325 /// Trace the `OwnedTasks`.
326 ///
327 /// # Preconditions
328 ///
329 /// This helper presumes exclusive access to each task. The tasks must not exist
330 /// in any other queue.
trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<(Id, Trace)>331 fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<(Id, Trace)> {
332 let mut tasks = dequeued;
333 // Notify and trace all un-notified tasks. The dequeued tasks are already
334 // notified and so do not need to be re-notified.
335 owned.for_each(|task| {
336 // Notify the task (and thus make it poll-able) and stash it. This fails
337 // if the task is already notified. In these cases, we skip tracing the
338 // task.
339 if let Some(notified) = task.notify_for_tracing() {
340 tasks.push(notified);
341 }
342 // We do not poll tasks here, since we hold a lock on `owned` and the
343 // task may complete and need to remove itself from `owned`. Polling
344 // such a task here would result in a deadlock.
345 });
346
347 tasks
348 .into_iter()
349 .map(|task| {
350 let local_notified = owned.assert_owner(task);
351 let id = local_notified.task.id();
352 let ((), trace) = Trace::capture(|| local_notified.run());
353 (id, trace)
354 })
355 .collect()
356 }
357