1 //! Unix handling of child processes.
2 //!
3 //! Right now the only "fancy" thing about this is how we implement the
4 //! `Future` implementation on `Child` to get the exit status. Unix offers
5 //! no way to register a child with epoll, and the only real way to get a
6 //! notification when a process exits is the SIGCHLD signal.
7 //!
8 //! Signal handling in general is *super* hairy and complicated, and it's even
9 //! more complicated here with the fact that signals are coalesced, so we may
10 //! not get a SIGCHLD-per-child.
11 //!
12 //! Our best approximation here is to check *all spawned processes* for all
13 //! SIGCHLD signals received. To do that we create a `Signal`, implemented in
14 //! the `tokio-net` crate, which is a stream over signals being received.
15 //!
16 //! Later when we poll the process's exit status we simply check to see if a
17 //! SIGCHLD has happened since we last checked, and while that returns "yes" we
18 //! keep trying.
19 //!
20 //! Note that this means that this isn't really scalable, but then again
21 //! processes in general aren't scalable (e.g. millions) so it shouldn't be that
22 //! bad in theory...
23 
24 pub(crate) mod orphan;
25 use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
26 
27 mod reap;
28 use reap::Reaper;
29 
30 #[cfg(all(target_os = "linux", feature = "rt"))]
31 mod pidfd_reaper;
32 
33 use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
34 use crate::process::kill::Kill;
35 use crate::process::SpawnedChild;
36 use crate::runtime::signal::Handle as SignalHandle;
37 use crate::signal::unix::{signal, Signal, SignalKind};
38 
39 use mio::event::Source;
40 use mio::unix::SourceFd;
41 use std::fmt;
42 use std::fs::File;
43 use std::future::Future;
44 use std::io;
45 use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
46 use std::pin::Pin;
47 use std::process::{Child as StdChild, ExitStatus, Stdio};
48 use std::task::Context;
49 use std::task::Poll;
50 
51 impl Wait for StdChild {
id(&self) -> u3252     fn id(&self) -> u32 {
53         self.id()
54     }
55 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>56     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
57         self.try_wait()
58     }
59 }
60 
61 impl Kill for StdChild {
kill(&mut self) -> io::Result<()>62     fn kill(&mut self) -> io::Result<()> {
63         self.kill()
64     }
65 }
66 
67 cfg_not_has_const_mutex_new! {
68     fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> {
69         use crate::util::once_cell::OnceCell;
70 
71         static ORPHAN_QUEUE: OnceCell<OrphanQueueImpl<StdChild>> = OnceCell::new();
72 
73         ORPHAN_QUEUE.get(OrphanQueueImpl::new)
74     }
75 }
76 
77 cfg_has_const_mutex_new! {
78     fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> {
79         static ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new();
80 
81         &ORPHAN_QUEUE
82     }
83 }
84 
85 pub(crate) struct GlobalOrphanQueue;
86 
87 impl fmt::Debug for GlobalOrphanQueue {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result88     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
89         get_orphan_queue().fmt(fmt)
90     }
91 }
92 
93 impl GlobalOrphanQueue {
reap_orphans(handle: &SignalHandle)94     pub(crate) fn reap_orphans(handle: &SignalHandle) {
95         get_orphan_queue().reap_orphans(handle);
96     }
97 }
98 
99 impl OrphanQueue<StdChild> for GlobalOrphanQueue {
push_orphan(&self, orphan: StdChild)100     fn push_orphan(&self, orphan: StdChild) {
101         get_orphan_queue().push_orphan(orphan);
102     }
103 }
104 
105 #[must_use = "futures do nothing unless polled"]
106 pub(crate) enum Child {
107     SignalReaper(Reaper<StdChild, GlobalOrphanQueue, Signal>),
108     #[cfg(all(target_os = "linux", feature = "rt"))]
109     PidfdReaper(pidfd_reaper::PidfdReaper<StdChild, GlobalOrphanQueue>),
110 }
111 
112 impl fmt::Debug for Child {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result113     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
114         fmt.debug_struct("Child").field("pid", &self.id()).finish()
115     }
116 }
117 
spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild>118 pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> {
119     let mut child = cmd.spawn()?;
120     let stdin = child.stdin.take().map(stdio).transpose()?;
121     let stdout = child.stdout.take().map(stdio).transpose()?;
122     let stderr = child.stderr.take().map(stdio).transpose()?;
123 
124     #[cfg(all(target_os = "linux", feature = "rt"))]
125     match pidfd_reaper::PidfdReaper::new(child, GlobalOrphanQueue) {
126         Ok(pidfd_reaper) => {
127             return Ok(SpawnedChild {
128                 child: Child::PidfdReaper(pidfd_reaper),
129                 stdin,
130                 stdout,
131                 stderr,
132             })
133         }
134         Err((Some(err), _child)) => return Err(err),
135         Err((None, child_returned)) => child = child_returned,
136     }
137 
138     let signal = signal(SignalKind::child())?;
139 
140     Ok(SpawnedChild {
141         child: Child::SignalReaper(Reaper::new(child, GlobalOrphanQueue, signal)),
142         stdin,
143         stdout,
144         stderr,
145     })
146 }
147 
148 impl Child {
id(&self) -> u32149     pub(crate) fn id(&self) -> u32 {
150         match self {
151             Self::SignalReaper(signal_reaper) => signal_reaper.id(),
152             #[cfg(all(target_os = "linux", feature = "rt"))]
153             Self::PidfdReaper(pidfd_reaper) => pidfd_reaper.id(),
154         }
155     }
156 
std_child(&mut self) -> &mut StdChild157     fn std_child(&mut self) -> &mut StdChild {
158         match self {
159             Self::SignalReaper(signal_reaper) => signal_reaper.inner_mut(),
160             #[cfg(all(target_os = "linux", feature = "rt"))]
161             Self::PidfdReaper(pidfd_reaper) => pidfd_reaper.inner_mut(),
162         }
163     }
164 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>165     pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
166         self.std_child().try_wait()
167     }
168 }
169 
170 impl Kill for Child {
kill(&mut self) -> io::Result<()>171     fn kill(&mut self) -> io::Result<()> {
172         self.std_child().kill()
173     }
174 }
175 
176 impl Future for Child {
177     type Output = io::Result<ExitStatus>;
178 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>179     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180         match Pin::into_inner(self) {
181             Self::SignalReaper(signal_reaper) => Pin::new(signal_reaper).poll(cx),
182             #[cfg(all(target_os = "linux", feature = "rt"))]
183             Self::PidfdReaper(pidfd_reaper) => Pin::new(pidfd_reaper).poll(cx),
184         }
185     }
186 }
187 
188 #[derive(Debug)]
189 pub(crate) struct Pipe {
190     // Actually a pipe is not a File. However, we are reusing `File` to get
191     // close on drop. This is a similar trick as `mio`.
192     fd: File,
193 }
194 
195 impl<T: IntoRawFd> From<T> for Pipe {
from(fd: T) -> Self196     fn from(fd: T) -> Self {
197         let fd = unsafe { File::from_raw_fd(fd.into_raw_fd()) };
198         Self { fd }
199     }
200 }
201 
202 impl<'a> io::Read for &'a Pipe {
read(&mut self, bytes: &mut [u8]) -> io::Result<usize>203     fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
204         (&self.fd).read(bytes)
205     }
206 }
207 
208 impl<'a> io::Write for &'a Pipe {
write(&mut self, bytes: &[u8]) -> io::Result<usize>209     fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
210         (&self.fd).write(bytes)
211     }
212 
flush(&mut self) -> io::Result<()>213     fn flush(&mut self) -> io::Result<()> {
214         (&self.fd).flush()
215     }
216 
write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize>217     fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
218         (&self.fd).write_vectored(bufs)
219     }
220 }
221 
222 impl AsRawFd for Pipe {
as_raw_fd(&self) -> RawFd223     fn as_raw_fd(&self) -> RawFd {
224         self.fd.as_raw_fd()
225     }
226 }
227 
228 impl AsFd for Pipe {
as_fd(&self) -> BorrowedFd<'_>229     fn as_fd(&self) -> BorrowedFd<'_> {
230         unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
231     }
232 }
233 
convert_to_blocking_file(io: ChildStdio) -> io::Result<File>234 fn convert_to_blocking_file(io: ChildStdio) -> io::Result<File> {
235     let mut fd = io.inner.into_inner()?.fd;
236 
237     // Ensure that the fd to be inherited is set to *blocking* mode, as this
238     // is the default that virtually all programs expect to have. Those
239     // programs that know how to work with nonblocking stdio will know how to
240     // change it to nonblocking mode.
241     set_nonblocking(&mut fd, false)?;
242 
243     Ok(fd)
244 }
245 
convert_to_stdio(io: ChildStdio) -> io::Result<Stdio>246 pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> {
247     convert_to_blocking_file(io).map(Stdio::from)
248 }
249 
250 impl Source for Pipe {
register( &mut self, registry: &mio::Registry, token: mio::Token, interest: mio::Interest, ) -> io::Result<()>251     fn register(
252         &mut self,
253         registry: &mio::Registry,
254         token: mio::Token,
255         interest: mio::Interest,
256     ) -> io::Result<()> {
257         SourceFd(&self.as_raw_fd()).register(registry, token, interest)
258     }
259 
reregister( &mut self, registry: &mio::Registry, token: mio::Token, interest: mio::Interest, ) -> io::Result<()>260     fn reregister(
261         &mut self,
262         registry: &mio::Registry,
263         token: mio::Token,
264         interest: mio::Interest,
265     ) -> io::Result<()> {
266         SourceFd(&self.as_raw_fd()).reregister(registry, token, interest)
267     }
268 
deregister(&mut self, registry: &mio::Registry) -> io::Result<()>269     fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
270         SourceFd(&self.as_raw_fd()).deregister(registry)
271     }
272 }
273 
274 pub(crate) struct ChildStdio {
275     inner: PollEvented<Pipe>,
276 }
277 
278 impl ChildStdio {
into_owned_fd(self) -> io::Result<OwnedFd>279     pub(super) fn into_owned_fd(self) -> io::Result<OwnedFd> {
280         convert_to_blocking_file(self).map(OwnedFd::from)
281     }
282 }
283 
284 impl fmt::Debug for ChildStdio {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result285     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
286         self.inner.fmt(fmt)
287     }
288 }
289 
290 impl AsRawFd for ChildStdio {
as_raw_fd(&self) -> RawFd291     fn as_raw_fd(&self) -> RawFd {
292         self.inner.as_raw_fd()
293     }
294 }
295 
296 impl AsFd for ChildStdio {
as_fd(&self) -> BorrowedFd<'_>297     fn as_fd(&self) -> BorrowedFd<'_> {
298         unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
299     }
300 }
301 
302 impl AsyncWrite for ChildStdio {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>303     fn poll_write(
304         self: Pin<&mut Self>,
305         cx: &mut Context<'_>,
306         buf: &[u8],
307     ) -> Poll<io::Result<usize>> {
308         self.inner.poll_write(cx, buf)
309     }
310 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>311     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
312         Poll::Ready(Ok(()))
313     }
314 
poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>315     fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
316         Poll::Ready(Ok(()))
317     }
318 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<Result<usize, io::Error>>319     fn poll_write_vectored(
320         self: Pin<&mut Self>,
321         cx: &mut Context<'_>,
322         bufs: &[io::IoSlice<'_>],
323     ) -> Poll<Result<usize, io::Error>> {
324         self.inner.poll_write_vectored(cx, bufs)
325     }
326 
is_write_vectored(&self) -> bool327     fn is_write_vectored(&self) -> bool {
328         true
329     }
330 }
331 
332 impl AsyncRead for ChildStdio {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>333     fn poll_read(
334         self: Pin<&mut Self>,
335         cx: &mut Context<'_>,
336         buf: &mut ReadBuf<'_>,
337     ) -> Poll<io::Result<()>> {
338         // Safety: pipes support reading into uninitialized memory
339         unsafe { self.inner.poll_read(cx, buf) }
340     }
341 }
342 
set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()>343 fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
344     unsafe {
345         let fd = fd.as_raw_fd();
346         let previous = libc::fcntl(fd, libc::F_GETFL);
347         if previous == -1 {
348             return Err(io::Error::last_os_error());
349         }
350 
351         let new = if nonblocking {
352             previous | libc::O_NONBLOCK
353         } else {
354             previous & !libc::O_NONBLOCK
355         };
356 
357         let r = libc::fcntl(fd, libc::F_SETFL, new);
358         if r == -1 {
359             return Err(io::Error::last_os_error());
360         }
361     }
362 
363     Ok(())
364 }
365 
stdio<T>(io: T) -> io::Result<ChildStdio> where T: IntoRawFd,366 pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
367 where
368     T: IntoRawFd,
369 {
370     // Set the fd to nonblocking before we pass it to the event loop
371     let mut pipe = Pipe::from(io);
372     set_nonblocking(&mut pipe, true)?;
373 
374     PollEvented::new(pipe).map(|inner| ChildStdio { inner })
375 }
376