1 use crate::loom::sync::{Mutex, MutexGuard};
2 use crate::runtime::signal::Handle as SignalHandle;
3 use crate::signal::unix::{signal_with_handle, SignalKind};
4 use crate::sync::watch;
5 use std::io;
6 use std::process::ExitStatus;
7
8 /// An interface for waiting on a process to exit.
9 pub(crate) trait Wait {
10 /// Get the identifier for this process or diagnostics.
11 #[allow(dead_code)]
id(&self) -> u3212 fn id(&self) -> u32;
13 /// Try waiting for a process to exit in a non-blocking manner.
try_wait(&mut self) -> io::Result<Option<ExitStatus>>14 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
15 }
16
17 impl<T: Wait> Wait for &mut T {
id(&self) -> u3218 fn id(&self) -> u32 {
19 (**self).id()
20 }
21
try_wait(&mut self) -> io::Result<Option<ExitStatus>>22 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
23 (**self).try_wait()
24 }
25 }
26
27 /// An interface for queueing up an orphaned process so that it can be reaped.
28 pub(crate) trait OrphanQueue<T> {
29 /// Adds an orphan to the queue.
push_orphan(&self, orphan: T)30 fn push_orphan(&self, orphan: T);
31 }
32
33 impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
push_orphan(&self, orphan: T)34 fn push_orphan(&self, orphan: T) {
35 (**self).push_orphan(orphan);
36 }
37 }
38
39 /// An implementation of `OrphanQueue`.
40 #[derive(Debug)]
41 pub(crate) struct OrphanQueueImpl<T> {
42 sigchild: Mutex<Option<watch::Receiver<()>>>,
43 queue: Mutex<Vec<T>>,
44 }
45
46 impl<T> OrphanQueueImpl<T> {
47 cfg_not_has_const_mutex_new! {
48 pub(crate) fn new() -> Self {
49 Self {
50 sigchild: Mutex::new(None),
51 queue: Mutex::new(Vec::new()),
52 }
53 }
54 }
55
56 cfg_has_const_mutex_new! {
57 pub(crate) const fn new() -> Self {
58 Self {
59 sigchild: Mutex::const_new(None),
60 queue: Mutex::const_new(Vec::new()),
61 }
62 }
63 }
64
65 #[cfg(test)]
len(&self) -> usize66 fn len(&self) -> usize {
67 self.queue.lock().len()
68 }
69
push_orphan(&self, orphan: T) where T: Wait,70 pub(crate) fn push_orphan(&self, orphan: T)
71 where
72 T: Wait,
73 {
74 self.queue.lock().push(orphan);
75 }
76
77 /// Attempts to reap every process in the queue, ignoring any errors and
78 /// enqueueing any orphans which have not yet exited.
reap_orphans(&self, handle: &SignalHandle) where T: Wait,79 pub(crate) fn reap_orphans(&self, handle: &SignalHandle)
80 where
81 T: Wait,
82 {
83 // If someone else is holding the lock, they will be responsible for draining
84 // the queue as necessary, so we can safely bail if that happens
85 if let Some(mut sigchild_guard) = self.sigchild.try_lock() {
86 match &mut *sigchild_guard {
87 Some(sigchild) => {
88 if sigchild.try_has_changed().and_then(Result::ok).is_some() {
89 drain_orphan_queue(self.queue.lock());
90 }
91 }
92 None => {
93 let queue = self.queue.lock();
94
95 // Be lazy and only initialize the SIGCHLD listener if there
96 // are any orphaned processes in the queue.
97 if !queue.is_empty() {
98 // An errors shouldn't really happen here, but if it does it
99 // means that the signal driver isn't running, in
100 // which case there isn't anything we can
101 // register/initialize here, so we can try again later
102 if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) {
103 *sigchild_guard = Some(sigchild);
104 drain_orphan_queue(queue);
105 }
106 }
107 }
108 }
109 }
110 }
111 }
112
drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) where T: Wait,113 fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>)
114 where
115 T: Wait,
116 {
117 for i in (0..queue.len()).rev() {
118 match queue[i].try_wait() {
119 Ok(None) => {}
120 Ok(Some(_)) | Err(_) => {
121 // The stdlib handles interruption errors (EINTR) when polling a child process.
122 // All other errors represent invalid inputs or pids that have already been
123 // reaped, so we can drop the orphan in case an error is raised.
124 queue.swap_remove(i);
125 }
126 }
127 }
128
129 drop(queue);
130 }
131
132 #[cfg(all(test, not(loom)))]
133 pub(crate) mod test {
134 use super::*;
135 use crate::runtime::io::Driver as IoDriver;
136 use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle};
137 use crate::sync::watch;
138 use std::cell::{Cell, RefCell};
139 use std::io;
140 use std::os::unix::process::ExitStatusExt;
141 use std::process::ExitStatus;
142 use std::rc::Rc;
143
144 pub(crate) struct MockQueue<W> {
145 pub(crate) all_enqueued: RefCell<Vec<W>>,
146 }
147
148 impl<W> MockQueue<W> {
new() -> Self149 pub(crate) fn new() -> Self {
150 Self {
151 all_enqueued: RefCell::new(Vec::new()),
152 }
153 }
154 }
155
156 impl<W> OrphanQueue<W> for MockQueue<W> {
push_orphan(&self, orphan: W)157 fn push_orphan(&self, orphan: W) {
158 self.all_enqueued.borrow_mut().push(orphan);
159 }
160 }
161
162 struct MockWait {
163 total_waits: Rc<Cell<usize>>,
164 num_wait_until_status: usize,
165 return_err: bool,
166 }
167
168 impl MockWait {
new(num_wait_until_status: usize) -> Self169 fn new(num_wait_until_status: usize) -> Self {
170 Self {
171 total_waits: Rc::new(Cell::new(0)),
172 num_wait_until_status,
173 return_err: false,
174 }
175 }
176
with_err() -> Self177 fn with_err() -> Self {
178 Self {
179 total_waits: Rc::new(Cell::new(0)),
180 num_wait_until_status: 0,
181 return_err: true,
182 }
183 }
184 }
185
186 impl Wait for MockWait {
id(&self) -> u32187 fn id(&self) -> u32 {
188 42
189 }
190
try_wait(&mut self) -> io::Result<Option<ExitStatus>>191 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
192 let waits = self.total_waits.get();
193
194 let ret = if self.num_wait_until_status == waits {
195 if self.return_err {
196 Ok(Some(ExitStatus::from_raw(0)))
197 } else {
198 Err(io::Error::new(io::ErrorKind::Other, "mock err"))
199 }
200 } else {
201 Ok(None)
202 };
203
204 self.total_waits.set(waits + 1);
205 ret
206 }
207 }
208
209 #[test]
drain_attempts_a_single_reap_of_all_queued_orphans()210 fn drain_attempts_a_single_reap_of_all_queued_orphans() {
211 let first_orphan = MockWait::new(0);
212 let second_orphan = MockWait::new(1);
213 let third_orphan = MockWait::new(2);
214 let fourth_orphan = MockWait::with_err();
215
216 let first_waits = first_orphan.total_waits.clone();
217 let second_waits = second_orphan.total_waits.clone();
218 let third_waits = third_orphan.total_waits.clone();
219 let fourth_waits = fourth_orphan.total_waits.clone();
220
221 let orphanage = OrphanQueueImpl::new();
222 orphanage.push_orphan(first_orphan);
223 orphanage.push_orphan(third_orphan);
224 orphanage.push_orphan(second_orphan);
225 orphanage.push_orphan(fourth_orphan);
226
227 assert_eq!(orphanage.len(), 4);
228
229 drain_orphan_queue(orphanage.queue.lock());
230 assert_eq!(orphanage.len(), 2);
231 assert_eq!(first_waits.get(), 1);
232 assert_eq!(second_waits.get(), 1);
233 assert_eq!(third_waits.get(), 1);
234 assert_eq!(fourth_waits.get(), 1);
235
236 drain_orphan_queue(orphanage.queue.lock());
237 assert_eq!(orphanage.len(), 1);
238 assert_eq!(first_waits.get(), 1);
239 assert_eq!(second_waits.get(), 2);
240 assert_eq!(third_waits.get(), 2);
241 assert_eq!(fourth_waits.get(), 1);
242
243 drain_orphan_queue(orphanage.queue.lock());
244 assert_eq!(orphanage.len(), 0);
245 assert_eq!(first_waits.get(), 1);
246 assert_eq!(second_waits.get(), 2);
247 assert_eq!(third_waits.get(), 3);
248 assert_eq!(fourth_waits.get(), 1);
249
250 // Safe to reap when empty
251 drain_orphan_queue(orphanage.queue.lock());
252 }
253
254 #[test]
no_reap_if_no_signal_received()255 fn no_reap_if_no_signal_received() {
256 let (tx, rx) = watch::channel(());
257
258 let handle = SignalHandle::default();
259
260 let orphanage = OrphanQueueImpl::new();
261 *orphanage.sigchild.lock() = Some(rx);
262
263 let orphan = MockWait::new(2);
264 let waits = orphan.total_waits.clone();
265 orphanage.push_orphan(orphan);
266
267 orphanage.reap_orphans(&handle);
268 assert_eq!(waits.get(), 0);
269
270 orphanage.reap_orphans(&handle);
271 assert_eq!(waits.get(), 0);
272
273 tx.send(()).unwrap();
274 orphanage.reap_orphans(&handle);
275 assert_eq!(waits.get(), 1);
276 }
277
278 #[test]
no_reap_if_signal_lock_held()279 fn no_reap_if_signal_lock_held() {
280 let handle = SignalHandle::default();
281
282 let orphanage = OrphanQueueImpl::new();
283 let signal_guard = orphanage.sigchild.lock();
284
285 let orphan = MockWait::new(2);
286 let waits = orphan.total_waits.clone();
287 orphanage.push_orphan(orphan);
288
289 orphanage.reap_orphans(&handle);
290 assert_eq!(waits.get(), 0);
291
292 drop(signal_guard);
293 }
294
295 #[cfg_attr(miri, ignore)] // Miri does not support epoll.
296 #[test]
does_not_register_signal_if_queue_empty()297 fn does_not_register_signal_if_queue_empty() {
298 let (io_driver, io_handle) = IoDriver::new(1024).unwrap();
299 let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap();
300 let handle = signal_driver.handle();
301
302 let orphanage = OrphanQueueImpl::new();
303 assert!(orphanage.sigchild.lock().is_none()); // Sanity
304
305 // No register when queue empty
306 orphanage.reap_orphans(&handle);
307 assert!(orphanage.sigchild.lock().is_none());
308
309 let orphan = MockWait::new(2);
310 let waits = orphan.total_waits.clone();
311 orphanage.push_orphan(orphan);
312
313 orphanage.reap_orphans(&handle);
314 assert!(orphanage.sigchild.lock().is_some());
315 assert_eq!(waits.get(), 1); // Eager reap when registering listener
316 }
317
318 #[test]
does_nothing_if_signal_could_not_be_registered()319 fn does_nothing_if_signal_could_not_be_registered() {
320 let handle = SignalHandle::default();
321
322 let orphanage = OrphanQueueImpl::new();
323 assert!(orphanage.sigchild.lock().is_none());
324
325 let orphan = MockWait::new(2);
326 let waits = orphan.total_waits.clone();
327 orphanage.push_orphan(orphan);
328
329 // Signal handler has "gone away", nothing to register or reap
330 orphanage.reap_orphans(&handle);
331 assert!(orphanage.sigchild.lock().is_none());
332 assert_eq!(waits.get(), 0);
333 }
334 }
335