1 use crate::loom::sync::{Mutex, MutexGuard};
2 use crate::runtime::signal::Handle as SignalHandle;
3 use crate::signal::unix::{signal_with_handle, SignalKind};
4 use crate::sync::watch;
5 use std::io;
6 use std::process::ExitStatus;
7 
8 /// An interface for waiting on a process to exit.
9 pub(crate) trait Wait {
10     /// Get the identifier for this process or diagnostics.
11     #[allow(dead_code)]
id(&self) -> u3212     fn id(&self) -> u32;
13     /// Try waiting for a process to exit in a non-blocking manner.
try_wait(&mut self) -> io::Result<Option<ExitStatus>>14     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
15 }
16 
17 impl<T: Wait> Wait for &mut T {
id(&self) -> u3218     fn id(&self) -> u32 {
19         (**self).id()
20     }
21 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>22     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
23         (**self).try_wait()
24     }
25 }
26 
27 /// An interface for queueing up an orphaned process so that it can be reaped.
28 pub(crate) trait OrphanQueue<T> {
29     /// Adds an orphan to the queue.
push_orphan(&self, orphan: T)30     fn push_orphan(&self, orphan: T);
31 }
32 
33 impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
push_orphan(&self, orphan: T)34     fn push_orphan(&self, orphan: T) {
35         (**self).push_orphan(orphan);
36     }
37 }
38 
39 /// An implementation of `OrphanQueue`.
40 #[derive(Debug)]
41 pub(crate) struct OrphanQueueImpl<T> {
42     sigchild: Mutex<Option<watch::Receiver<()>>>,
43     queue: Mutex<Vec<T>>,
44 }
45 
46 impl<T> OrphanQueueImpl<T> {
47     cfg_not_has_const_mutex_new! {
48         pub(crate) fn new() -> Self {
49             Self {
50                 sigchild: Mutex::new(None),
51                 queue: Mutex::new(Vec::new()),
52             }
53         }
54     }
55 
56     cfg_has_const_mutex_new! {
57         pub(crate) const fn new() -> Self {
58             Self {
59                 sigchild: Mutex::const_new(None),
60                 queue: Mutex::const_new(Vec::new()),
61             }
62         }
63     }
64 
65     #[cfg(test)]
len(&self) -> usize66     fn len(&self) -> usize {
67         self.queue.lock().len()
68     }
69 
push_orphan(&self, orphan: T) where T: Wait,70     pub(crate) fn push_orphan(&self, orphan: T)
71     where
72         T: Wait,
73     {
74         self.queue.lock().push(orphan);
75     }
76 
77     /// Attempts to reap every process in the queue, ignoring any errors and
78     /// enqueueing any orphans which have not yet exited.
reap_orphans(&self, handle: &SignalHandle) where T: Wait,79     pub(crate) fn reap_orphans(&self, handle: &SignalHandle)
80     where
81         T: Wait,
82     {
83         // If someone else is holding the lock, they will be responsible for draining
84         // the queue as necessary, so we can safely bail if that happens
85         if let Some(mut sigchild_guard) = self.sigchild.try_lock() {
86             match &mut *sigchild_guard {
87                 Some(sigchild) => {
88                     if sigchild.try_has_changed().and_then(Result::ok).is_some() {
89                         drain_orphan_queue(self.queue.lock());
90                     }
91                 }
92                 None => {
93                     let queue = self.queue.lock();
94 
95                     // Be lazy and only initialize the SIGCHLD listener if there
96                     // are any orphaned processes in the queue.
97                     if !queue.is_empty() {
98                         // An errors shouldn't really happen here, but if it does it
99                         // means that the signal driver isn't running, in
100                         // which case there isn't anything we can
101                         // register/initialize here, so we can try again later
102                         if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) {
103                             *sigchild_guard = Some(sigchild);
104                             drain_orphan_queue(queue);
105                         }
106                     }
107                 }
108             }
109         }
110     }
111 }
112 
drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) where T: Wait,113 fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>)
114 where
115     T: Wait,
116 {
117     for i in (0..queue.len()).rev() {
118         match queue[i].try_wait() {
119             Ok(None) => {}
120             Ok(Some(_)) | Err(_) => {
121                 // The stdlib handles interruption errors (EINTR) when polling a child process.
122                 // All other errors represent invalid inputs or pids that have already been
123                 // reaped, so we can drop the orphan in case an error is raised.
124                 queue.swap_remove(i);
125             }
126         }
127     }
128 
129     drop(queue);
130 }
131 
132 #[cfg(all(test, not(loom)))]
133 pub(crate) mod test {
134     use super::*;
135     use crate::runtime::io::Driver as IoDriver;
136     use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle};
137     use crate::sync::watch;
138     use std::cell::{Cell, RefCell};
139     use std::io;
140     use std::os::unix::process::ExitStatusExt;
141     use std::process::ExitStatus;
142     use std::rc::Rc;
143 
144     pub(crate) struct MockQueue<W> {
145         pub(crate) all_enqueued: RefCell<Vec<W>>,
146     }
147 
148     impl<W> MockQueue<W> {
new() -> Self149         pub(crate) fn new() -> Self {
150             Self {
151                 all_enqueued: RefCell::new(Vec::new()),
152             }
153         }
154     }
155 
156     impl<W> OrphanQueue<W> for MockQueue<W> {
push_orphan(&self, orphan: W)157         fn push_orphan(&self, orphan: W) {
158             self.all_enqueued.borrow_mut().push(orphan);
159         }
160     }
161 
162     struct MockWait {
163         total_waits: Rc<Cell<usize>>,
164         num_wait_until_status: usize,
165         return_err: bool,
166     }
167 
168     impl MockWait {
new(num_wait_until_status: usize) -> Self169         fn new(num_wait_until_status: usize) -> Self {
170             Self {
171                 total_waits: Rc::new(Cell::new(0)),
172                 num_wait_until_status,
173                 return_err: false,
174             }
175         }
176 
with_err() -> Self177         fn with_err() -> Self {
178             Self {
179                 total_waits: Rc::new(Cell::new(0)),
180                 num_wait_until_status: 0,
181                 return_err: true,
182             }
183         }
184     }
185 
186     impl Wait for MockWait {
id(&self) -> u32187         fn id(&self) -> u32 {
188             42
189         }
190 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>191         fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
192             let waits = self.total_waits.get();
193 
194             let ret = if self.num_wait_until_status == waits {
195                 if self.return_err {
196                     Ok(Some(ExitStatus::from_raw(0)))
197                 } else {
198                     Err(io::Error::new(io::ErrorKind::Other, "mock err"))
199                 }
200             } else {
201                 Ok(None)
202             };
203 
204             self.total_waits.set(waits + 1);
205             ret
206         }
207     }
208 
209     #[test]
drain_attempts_a_single_reap_of_all_queued_orphans()210     fn drain_attempts_a_single_reap_of_all_queued_orphans() {
211         let first_orphan = MockWait::new(0);
212         let second_orphan = MockWait::new(1);
213         let third_orphan = MockWait::new(2);
214         let fourth_orphan = MockWait::with_err();
215 
216         let first_waits = first_orphan.total_waits.clone();
217         let second_waits = second_orphan.total_waits.clone();
218         let third_waits = third_orphan.total_waits.clone();
219         let fourth_waits = fourth_orphan.total_waits.clone();
220 
221         let orphanage = OrphanQueueImpl::new();
222         orphanage.push_orphan(first_orphan);
223         orphanage.push_orphan(third_orphan);
224         orphanage.push_orphan(second_orphan);
225         orphanage.push_orphan(fourth_orphan);
226 
227         assert_eq!(orphanage.len(), 4);
228 
229         drain_orphan_queue(orphanage.queue.lock());
230         assert_eq!(orphanage.len(), 2);
231         assert_eq!(first_waits.get(), 1);
232         assert_eq!(second_waits.get(), 1);
233         assert_eq!(third_waits.get(), 1);
234         assert_eq!(fourth_waits.get(), 1);
235 
236         drain_orphan_queue(orphanage.queue.lock());
237         assert_eq!(orphanage.len(), 1);
238         assert_eq!(first_waits.get(), 1);
239         assert_eq!(second_waits.get(), 2);
240         assert_eq!(third_waits.get(), 2);
241         assert_eq!(fourth_waits.get(), 1);
242 
243         drain_orphan_queue(orphanage.queue.lock());
244         assert_eq!(orphanage.len(), 0);
245         assert_eq!(first_waits.get(), 1);
246         assert_eq!(second_waits.get(), 2);
247         assert_eq!(third_waits.get(), 3);
248         assert_eq!(fourth_waits.get(), 1);
249 
250         // Safe to reap when empty
251         drain_orphan_queue(orphanage.queue.lock());
252     }
253 
254     #[test]
no_reap_if_no_signal_received()255     fn no_reap_if_no_signal_received() {
256         let (tx, rx) = watch::channel(());
257 
258         let handle = SignalHandle::default();
259 
260         let orphanage = OrphanQueueImpl::new();
261         *orphanage.sigchild.lock() = Some(rx);
262 
263         let orphan = MockWait::new(2);
264         let waits = orphan.total_waits.clone();
265         orphanage.push_orphan(orphan);
266 
267         orphanage.reap_orphans(&handle);
268         assert_eq!(waits.get(), 0);
269 
270         orphanage.reap_orphans(&handle);
271         assert_eq!(waits.get(), 0);
272 
273         tx.send(()).unwrap();
274         orphanage.reap_orphans(&handle);
275         assert_eq!(waits.get(), 1);
276     }
277 
278     #[test]
no_reap_if_signal_lock_held()279     fn no_reap_if_signal_lock_held() {
280         let handle = SignalHandle::default();
281 
282         let orphanage = OrphanQueueImpl::new();
283         let signal_guard = orphanage.sigchild.lock();
284 
285         let orphan = MockWait::new(2);
286         let waits = orphan.total_waits.clone();
287         orphanage.push_orphan(orphan);
288 
289         orphanage.reap_orphans(&handle);
290         assert_eq!(waits.get(), 0);
291 
292         drop(signal_guard);
293     }
294 
295     #[cfg_attr(miri, ignore)] // Miri does not support epoll.
296     #[test]
does_not_register_signal_if_queue_empty()297     fn does_not_register_signal_if_queue_empty() {
298         let (io_driver, io_handle) = IoDriver::new(1024).unwrap();
299         let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap();
300         let handle = signal_driver.handle();
301 
302         let orphanage = OrphanQueueImpl::new();
303         assert!(orphanage.sigchild.lock().is_none()); // Sanity
304 
305         // No register when queue empty
306         orphanage.reap_orphans(&handle);
307         assert!(orphanage.sigchild.lock().is_none());
308 
309         let orphan = MockWait::new(2);
310         let waits = orphan.total_waits.clone();
311         orphanage.push_orphan(orphan);
312 
313         orphanage.reap_orphans(&handle);
314         assert!(orphanage.sigchild.lock().is_some());
315         assert_eq!(waits.get(), 1); // Eager reap when registering listener
316     }
317 
318     #[test]
does_nothing_if_signal_could_not_be_registered()319     fn does_nothing_if_signal_could_not_be_registered() {
320         let handle = SignalHandle::default();
321 
322         let orphanage = OrphanQueueImpl::new();
323         assert!(orphanage.sigchild.lock().is_none());
324 
325         let orphan = MockWait::new(2);
326         let waits = orphan.total_waits.clone();
327         orphanage.push_orphan(orphan);
328 
329         // Signal handler has "gone away", nothing to register or reap
330         orphanage.reap_orphans(&handle);
331         assert!(orphanage.sigchild.lock().is_none());
332         assert_eq!(waits.get(), 0);
333     }
334 }
335