tokio/process/unix/
pidfd_reaper.rs

1use crate::{
2    io::{interest::Interest, PollEvented},
3    process::{
4        imp::{orphan::Wait, OrphanQueue},
5        kill::Kill,
6    },
7};
8
9use libc::{syscall, SYS_pidfd_open, ENOSYS, PIDFD_NONBLOCK};
10use mio::{event::Source, unix::SourceFd};
11use std::{
12    fs::File,
13    future::Future,
14    io,
15    marker::Unpin,
16    ops::Deref,
17    os::unix::io::{AsRawFd, FromRawFd, RawFd},
18    pin::Pin,
19    process::ExitStatus,
20    sync::atomic::{AtomicBool, Ordering::Relaxed},
21    task::{Context, Poll},
22};
23
24#[derive(Debug)]
25struct Pidfd {
26    fd: File,
27}
28
29impl Pidfd {
30    fn open(pid: u32) -> Option<Pidfd> {
31        // Store false (0) to reduce executable size
32        static NO_PIDFD_SUPPORT: AtomicBool = AtomicBool::new(false);
33
34        if NO_PIDFD_SUPPORT.load(Relaxed) {
35            return None;
36        }
37
38        // Safety: The following function calls invovkes syscall pidfd_open,
39        // which takes two parameter: pidfd_open(fd: c_int, flag: c_int)
40        let fd = unsafe { syscall(SYS_pidfd_open, pid, PIDFD_NONBLOCK) };
41        if fd == -1 {
42            let errno = io::Error::last_os_error().raw_os_error().unwrap();
43
44            if errno == ENOSYS {
45                NO_PIDFD_SUPPORT.store(true, Relaxed)
46            }
47
48            None
49        } else {
50            // Safety: pidfd_open returns -1 on error or a valid fd with ownership.
51            Some(Pidfd {
52                fd: unsafe { File::from_raw_fd(fd as i32) },
53            })
54        }
55    }
56}
57
58impl AsRawFd for Pidfd {
59    fn as_raw_fd(&self) -> RawFd {
60        self.fd.as_raw_fd()
61    }
62}
63
64impl Source for Pidfd {
65    fn register(
66        &mut self,
67        registry: &mio::Registry,
68        token: mio::Token,
69        interest: mio::Interest,
70    ) -> io::Result<()> {
71        SourceFd(&self.as_raw_fd()).register(registry, token, interest)
72    }
73
74    fn reregister(
75        &mut self,
76        registry: &mio::Registry,
77        token: mio::Token,
78        interest: mio::Interest,
79    ) -> io::Result<()> {
80        SourceFd(&self.as_raw_fd()).reregister(registry, token, interest)
81    }
82
83    fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
84        SourceFd(&self.as_raw_fd()).deregister(registry)
85    }
86}
87
88#[derive(Debug)]
89struct PidfdReaperInner<W>
90where
91    W: Unpin,
92{
93    inner: W,
94    pidfd: PollEvented<Pidfd>,
95}
96
97impl<W> Future for PidfdReaperInner<W>
98where
99    W: Wait + Unpin,
100{
101    type Output = io::Result<ExitStatus>;
102
103    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
104        let this = Pin::into_inner(self);
105
106        match this.pidfd.registration().poll_read_ready(cx) {
107            Poll::Ready(Ok(evt)) => {
108                if let Some(exit_code) = this.inner.try_wait()? {
109                    return Poll::Ready(Ok(exit_code));
110                }
111                this.pidfd.registration().clear_readiness(evt);
112            }
113            Poll::Ready(Err(err)) if crate::runtime::is_rt_shutdown_err(&err) => {}
114            Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
115            Poll::Pending => return Poll::Pending,
116        };
117
118        this.pidfd.reregister(Interest::READABLE)?;
119        cx.waker().wake_by_ref();
120        Poll::Pending
121    }
122}
123
124#[derive(Debug)]
125pub(crate) struct PidfdReaper<W, Q>
126where
127    W: Wait + Unpin,
128    Q: OrphanQueue<W> + Unpin,
129{
130    inner: Option<PidfdReaperInner<W>>,
131    orphan_queue: Q,
132}
133
134impl<W, Q> Deref for PidfdReaper<W, Q>
135where
136    W: Wait + Unpin,
137    Q: OrphanQueue<W> + Unpin,
138{
139    type Target = W;
140
141    fn deref(&self) -> &Self::Target {
142        &self.inner.as_ref().expect("inner has gone away").inner
143    }
144}
145
146impl<W, Q> PidfdReaper<W, Q>
147where
148    W: Wait + Unpin,
149    Q: OrphanQueue<W> + Unpin,
150{
151    pub(crate) fn new(inner: W, orphan_queue: Q) -> Result<Self, (Option<io::Error>, W)> {
152        if let Some(pidfd) = Pidfd::open(inner.id()) {
153            match PollEvented::new_with_interest(pidfd, Interest::READABLE) {
154                Ok(pidfd) => Ok(Self {
155                    inner: Some(PidfdReaperInner { pidfd, inner }),
156                    orphan_queue,
157                }),
158                Err(io_error) => Err((Some(io_error), inner)),
159            }
160        } else {
161            Err((None, inner))
162        }
163    }
164
165    pub(crate) fn inner_mut(&mut self) -> &mut W {
166        &mut self.inner.as_mut().expect("inner has gone away").inner
167    }
168}
169
170impl<W, Q> Future for PidfdReaper<W, Q>
171where
172    W: Wait + Unpin,
173    Q: OrphanQueue<W> + Unpin,
174{
175    type Output = io::Result<ExitStatus>;
176
177    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
178        Pin::new(
179            Pin::into_inner(self)
180                .inner
181                .as_mut()
182                .expect("inner has gone away"),
183        )
184        .poll(cx)
185    }
186}
187
188impl<W, Q> Kill for PidfdReaper<W, Q>
189where
190    W: Wait + Unpin + Kill,
191    Q: OrphanQueue<W> + Unpin,
192{
193    fn kill(&mut self) -> io::Result<()> {
194        self.inner_mut().kill()
195    }
196}
197
198impl<W, Q> Drop for PidfdReaper<W, Q>
199where
200    W: Wait + Unpin,
201    Q: OrphanQueue<W> + Unpin,
202{
203    fn drop(&mut self) {
204        let mut orphan = self.inner.take().expect("inner has gone away").inner;
205        if let Ok(Some(_)) = orphan.try_wait() {
206            return;
207        }
208
209        self.orphan_queue.push_orphan(orphan);
210    }
211}
212
213#[cfg(all(test, not(loom), not(miri)))]
214mod test {
215    use super::*;
216    use crate::{
217        process::unix::orphan::test::MockQueue,
218        runtime::{Builder as RuntimeBuilder, Runtime},
219    };
220    use std::process::{Command, Output};
221
222    fn create_runtime() -> Runtime {
223        RuntimeBuilder::new_current_thread()
224            .enable_io()
225            .build()
226            .unwrap()
227    }
228
229    fn run_test(fut: impl Future<Output = ()>) {
230        create_runtime().block_on(fut)
231    }
232
233    fn is_pidfd_available() -> bool {
234        let Output { stdout, status, .. } = Command::new("uname").arg("-r").output().unwrap();
235        assert!(status.success());
236        let stdout = String::from_utf8_lossy(&stdout);
237
238        let mut kernel_version_iter = match stdout.split_once('-') {
239            Some((version, _)) => version,
240            _ => &stdout,
241        }
242        .split('.');
243
244        let major: u32 = kernel_version_iter.next().unwrap().parse().unwrap();
245        let minor: u32 = kernel_version_iter.next().unwrap().trim().parse().unwrap();
246
247        major >= 6 || (major == 5 && minor >= 10)
248    }
249
250    #[test]
251    fn test_pidfd_reaper_poll() {
252        if !is_pidfd_available() {
253            eprintln!("pidfd is not available on this linux kernel, skip this test");
254            return;
255        }
256
257        let queue = MockQueue::new();
258
259        run_test(async {
260            let child = Command::new("true").spawn().unwrap();
261            let pidfd_reaper = PidfdReaper::new(child, &queue).unwrap();
262
263            let exit_status = pidfd_reaper.await.unwrap();
264            assert!(exit_status.success());
265        });
266
267        assert!(queue.all_enqueued.borrow().is_empty());
268    }
269
270    #[test]
271    fn test_pidfd_reaper_kill() {
272        if !is_pidfd_available() {
273            eprintln!("pidfd is not available on this linux kernel, skip this test");
274            return;
275        }
276
277        let queue = MockQueue::new();
278
279        run_test(async {
280            let child = Command::new("sleep").arg("1800").spawn().unwrap();
281            let mut pidfd_reaper = PidfdReaper::new(child, &queue).unwrap();
282
283            pidfd_reaper.kill().unwrap();
284
285            let exit_status = pidfd_reaper.await.unwrap();
286            assert!(!exit_status.success());
287        });
288
289        assert!(queue.all_enqueued.borrow().is_empty());
290    }
291
292    #[test]
293    fn test_pidfd_reaper_drop() {
294        if !is_pidfd_available() {
295            eprintln!("pidfd is not available on this linux kernel, skip this test");
296            return;
297        }
298
299        let queue = MockQueue::new();
300
301        let mut child = Command::new("sleep").arg("1800").spawn().unwrap();
302
303        run_test(async {
304            let _pidfd_reaper = PidfdReaper::new(&mut child, &queue).unwrap();
305        });
306
307        assert_eq!(queue.all_enqueued.borrow().len(), 1);
308
309        child.kill().unwrap();
310        child.wait().unwrap();
311    }
312}