mio/sys/unix/
pipe.rs

1//! Unix pipe.
2//!
3//! See the [`new`] function for documentation.
4
5use std::io;
6use std::os::fd::RawFd;
7
8pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> {
9    let mut fds: [RawFd; 2] = [-1, -1];
10
11    #[cfg(any(
12        target_os = "android",
13        target_os = "dragonfly",
14        target_os = "freebsd",
15        target_os = "fuchsia",
16        target_os = "hurd",
17        target_os = "linux",
18        target_os = "netbsd",
19        target_os = "openbsd",
20        target_os = "illumos",
21        target_os = "redox",
22        target_os = "solaris",
23        target_os = "vita",
24    ))]
25    unsafe {
26        if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
27            return Err(io::Error::last_os_error());
28        }
29    }
30
31    #[cfg(any(
32        target_os = "aix",
33        target_os = "haiku",
34        target_os = "ios",
35        target_os = "macos",
36        target_os = "tvos",
37        target_os = "visionos",
38        target_os = "watchos",
39        target_os = "espidf",
40        target_os = "nto",
41    ))]
42    unsafe {
43        // For platforms that don't have `pipe2(2)` we need to manually set the
44        // correct flags on the file descriptor.
45        if libc::pipe(fds.as_mut_ptr()) != 0 {
46            return Err(io::Error::last_os_error());
47        }
48
49        for fd in &fds {
50            if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
51                || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
52            {
53                let err = io::Error::last_os_error();
54                // Don't leak file descriptors. Can't handle closing error though.
55                let _ = libc::close(fds[0]);
56                let _ = libc::close(fds[1]);
57                return Err(err);
58            }
59        }
60    }
61
62    Ok(fds)
63}
64
65cfg_os_ext! {
66use std::fs::File;
67use std::io::{IoSlice, IoSliceMut, Read, Write};
68use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd};
69use std::process::{ChildStderr, ChildStdin, ChildStdout};
70
71use crate::io_source::IoSource;
72use crate::{event, Interest, Registry, Token};
73
74/// Create a new non-blocking Unix pipe.
75///
76/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
77/// inter-process or thread communication channel.
78///
79/// This channel may be created before forking the process and then one end used
80/// in each process, e.g. the parent process has the sending end to send command
81/// to the child process.
82///
83/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
84///
85/// # Events
86///
87/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
88/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
89/// written to the `Sender` the `Receiver` will receive an [readable event].
90///
91/// In addition to those events, events will also be generated if the other side
92/// is dropped. To check if the `Sender` is dropped you'll need to check
93/// [`is_read_closed`] on events for the `Receiver`, if it returns true the
94/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
95/// returns true the `Receiver` was dropped. Also see the second example below.
96///
97/// [`WRITABLE`]: Interest::WRITABLE
98/// [writable events]: event::Event::is_writable
99/// [`READABLE`]: Interest::READABLE
100/// [readable event]: event::Event::is_readable
101/// [`is_read_closed`]: event::Event::is_read_closed
102/// [`is_write_closed`]: event::Event::is_write_closed
103///
104/// # Deregistering
105///
106/// Both `Sender` and `Receiver` will deregister themselves when dropped,
107/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
108///
109/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
110///
111/// # Examples
112///
113/// Simple example that writes data into the sending end and read it from the
114/// receiving end.
115///
116/// ```
117/// use std::io::{self, Read, Write};
118///
119/// use mio::{Poll, Events, Interest, Token};
120/// use mio::unix::pipe;
121///
122/// // Unique tokens for the two ends of the channel.
123/// const PIPE_RECV: Token = Token(0);
124/// const PIPE_SEND: Token = Token(1);
125///
126/// # fn main() -> io::Result<()> {
127/// // Create our `Poll` instance and the `Events` container.
128/// let mut poll = Poll::new()?;
129/// let mut events = Events::with_capacity(8);
130///
131/// // Create a new pipe.
132/// let (mut sender, mut receiver) = pipe::new()?;
133///
134/// // Register both ends of the channel.
135/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
136/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
137///
138/// const MSG: &[u8; 11] = b"Hello world";
139///
140/// loop {
141///     poll.poll(&mut events, None)?;
142///
143///     for event in events.iter() {
144///         match event.token() {
145///             PIPE_SEND => sender.write(MSG)
146///                 .and_then(|n| if n != MSG.len() {
147///                         // We'll consider a short write an error in this
148///                         // example. NOTE: we can't use `write_all` with
149///                         // non-blocking I/O.
150///                         Err(io::ErrorKind::WriteZero.into())
151///                     } else {
152///                         Ok(())
153///                     })?,
154///             PIPE_RECV => {
155///                 let mut buf = [0; 11];
156///                 let n = receiver.read(&mut buf)?;
157///                 println!("received: {:?}", &buf[0..n]);
158///                 assert_eq!(n, MSG.len());
159///                 assert_eq!(&buf, &*MSG);
160///                 return Ok(());
161///             },
162///             _ => unreachable!(),
163///         }
164///     }
165/// }
166/// # }
167/// ```
168///
169/// Example that receives an event once the `Sender` is dropped.
170///
171/// ```
172/// # use std::io;
173/// #
174/// # use mio::{Poll, Events, Interest, Token};
175/// # use mio::unix::pipe;
176/// #
177/// # const PIPE_RECV: Token = Token(0);
178/// # const PIPE_SEND: Token = Token(1);
179/// #
180/// # fn main() -> io::Result<()> {
181/// // Same setup as in the example above.
182/// let mut poll = Poll::new()?;
183/// let mut events = Events::with_capacity(8);
184///
185/// let (mut sender, mut receiver) = pipe::new()?;
186///
187/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
188/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
189///
190/// // Drop the sender.
191/// drop(sender);
192///
193/// poll.poll(&mut events, None)?;
194///
195/// for event in events.iter() {
196///     match event.token() {
197///         PIPE_RECV if event.is_read_closed() => {
198///             // Detected that the sender was dropped.
199///             println!("Sender dropped!");
200///             return Ok(());
201///         },
202///         _ => unreachable!(),
203///     }
204/// }
205/// # unreachable!();
206/// # }
207/// ```
208pub fn new() -> io::Result<(Sender, Receiver)> {
209    let fds = new_raw()?;
210    // SAFETY: `new_raw` initialised the `fds` above.
211    let r = unsafe { Receiver::from_raw_fd(fds[0]) };
212    let w = unsafe { Sender::from_raw_fd(fds[1]) };
213    Ok((w, r))
214}
215
216/// Sending end of an Unix pipe.
217///
218/// See [`new`] for documentation, including examples.
219#[derive(Debug)]
220pub struct Sender {
221    inner: IoSource<File>,
222}
223
224impl Sender {
225    /// Set the `Sender` into or out of non-blocking mode.
226    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
227        set_nonblocking(self.inner.as_raw_fd(), nonblocking)
228    }
229
230    /// Execute an I/O operation ensuring that the socket receives more events
231    /// if it hits a [`WouldBlock`] error.
232    ///
233    /// # Notes
234    ///
235    /// This method is required to be called for **all** I/O operations to
236    /// ensure the user will receive events once the socket is ready again after
237    /// returning a [`WouldBlock`] error.
238    ///
239    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
240    ///
241    /// # Examples
242    ///
243    /// ```
244    /// # use std::error::Error;
245    /// #
246    /// # fn main() -> Result<(), Box<dyn Error>> {
247    /// use std::io;
248    /// use std::os::fd::AsRawFd;
249    /// use mio::unix::pipe;
250    ///
251    /// let (sender, receiver) = pipe::new()?;
252    ///
253    /// // Wait until the sender is writable...
254    ///
255    /// // Write to the sender using a direct libc call, of course the
256    /// // `io::Write` implementation would be easier to use.
257    /// let buf = b"hello";
258    /// let n = sender.try_io(|| {
259    ///     let buf_ptr = &buf as *const _ as *const _;
260    ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
261    ///     if res != -1 {
262    ///         Ok(res as usize)
263    ///     } else {
264    ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
265    ///         // should return `WouldBlock` error.
266    ///         Err(io::Error::last_os_error())
267    ///     }
268    /// })?;
269    /// eprintln!("write {} bytes", n);
270    ///
271    /// // Wait until the receiver is readable...
272    ///
273    /// // Read from the receiver using a direct libc call, of course the
274    /// // `io::Read` implementation would be easier to use.
275    /// let mut buf = [0; 512];
276    /// let n = receiver.try_io(|| {
277    ///     let buf_ptr = &mut buf as *mut _ as *mut _;
278    ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
279    ///     if res != -1 {
280    ///         Ok(res as usize)
281    ///     } else {
282    ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
283    ///         // should return `WouldBlock` error.
284    ///         Err(io::Error::last_os_error())
285    ///     }
286    /// })?;
287    /// eprintln!("read {} bytes", n);
288    /// # Ok(())
289    /// # }
290    /// ```
291    pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
292    where
293        F: FnOnce() -> io::Result<T>,
294    {
295        self.inner.do_io(|_| f())
296    }
297}
298
299impl event::Source for Sender {
300    fn register(
301        &mut self,
302        registry: &Registry,
303        token: Token,
304        interests: Interest,
305    ) -> io::Result<()> {
306        self.inner.register(registry, token, interests)
307    }
308
309    fn reregister(
310        &mut self,
311        registry: &Registry,
312        token: Token,
313        interests: Interest,
314    ) -> io::Result<()> {
315        self.inner.reregister(registry, token, interests)
316    }
317
318    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
319        self.inner.deregister(registry)
320    }
321}
322
323impl Write for Sender {
324    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
325        self.inner.do_io(|mut sender| sender.write(buf))
326    }
327
328    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
329        self.inner.do_io(|mut sender| sender.write_vectored(bufs))
330    }
331
332    fn flush(&mut self) -> io::Result<()> {
333        self.inner.do_io(|mut sender| sender.flush())
334    }
335}
336
337impl Write for &Sender {
338    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
339        self.inner.do_io(|mut sender| sender.write(buf))
340    }
341
342    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
343        self.inner.do_io(|mut sender| sender.write_vectored(bufs))
344    }
345
346    fn flush(&mut self) -> io::Result<()> {
347        self.inner.do_io(|mut sender| sender.flush())
348    }
349}
350
351/// # Notes
352///
353/// The underlying pipe is **not** set to non-blocking.
354impl From<ChildStdin> for Sender {
355    fn from(stdin: ChildStdin) -> Sender {
356        // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
357        unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
358    }
359}
360
361impl FromRawFd for Sender {
362    unsafe fn from_raw_fd(fd: RawFd) -> Sender {
363        Sender {
364            inner: IoSource::new(File::from_raw_fd(fd)),
365        }
366    }
367}
368
369impl AsRawFd for Sender {
370    fn as_raw_fd(&self) -> RawFd {
371        self.inner.as_raw_fd()
372    }
373}
374
375impl IntoRawFd for Sender {
376    fn into_raw_fd(self) -> RawFd {
377        self.inner.into_inner().into_raw_fd()
378    }
379}
380
381impl AsFd for Sender {
382    fn as_fd(&self) -> BorrowedFd<'_> {
383        self.inner.as_fd()
384    }
385}
386
387/// Receiving end of an Unix pipe.
388///
389/// See [`new`] for documentation, including examples.
390#[derive(Debug)]
391pub struct Receiver {
392    inner: IoSource<File>,
393}
394
395impl Receiver {
396    /// Set the `Receiver` into or out of non-blocking mode.
397    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
398        set_nonblocking(self.inner.as_raw_fd(), nonblocking)
399    }
400
401    /// Execute an I/O operation ensuring that the socket receives more events
402    /// if it hits a [`WouldBlock`] error.
403    ///
404    /// # Notes
405    ///
406    /// This method is required to be called for **all** I/O operations to
407    /// ensure the user will receive events once the socket is ready again after
408    /// returning a [`WouldBlock`] error.
409    ///
410    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
411    ///
412    /// # Examples
413    ///
414    /// ```
415    /// # use std::error::Error;
416    /// #
417    /// # fn main() -> Result<(), Box<dyn Error>> {
418    /// use std::io;
419    /// use std::os::fd::AsRawFd;
420    /// use mio::unix::pipe;
421    ///
422    /// let (sender, receiver) = pipe::new()?;
423    ///
424    /// // Wait until the sender is writable...
425    ///
426    /// // Write to the sender using a direct libc call, of course the
427    /// // `io::Write` implementation would be easier to use.
428    /// let buf = b"hello";
429    /// let n = sender.try_io(|| {
430    ///     let buf_ptr = &buf as *const _ as *const _;
431    ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
432    ///     if res != -1 {
433    ///         Ok(res as usize)
434    ///     } else {
435    ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
436    ///         // should return `WouldBlock` error.
437    ///         Err(io::Error::last_os_error())
438    ///     }
439    /// })?;
440    /// eprintln!("write {} bytes", n);
441    ///
442    /// // Wait until the receiver is readable...
443    ///
444    /// // Read from the receiver using a direct libc call, of course the
445    /// // `io::Read` implementation would be easier to use.
446    /// let mut buf = [0; 512];
447    /// let n = receiver.try_io(|| {
448    ///     let buf_ptr = &mut buf as *mut _ as *mut _;
449    ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
450    ///     if res != -1 {
451    ///         Ok(res as usize)
452    ///     } else {
453    ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
454    ///         // should return `WouldBlock` error.
455    ///         Err(io::Error::last_os_error())
456    ///     }
457    /// })?;
458    /// eprintln!("read {} bytes", n);
459    /// # Ok(())
460    /// # }
461    /// ```
462    pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
463    where
464        F: FnOnce() -> io::Result<T>,
465    {
466        self.inner.do_io(|_| f())
467    }
468}
469
470impl event::Source for Receiver {
471    fn register(
472        &mut self,
473        registry: &Registry,
474        token: Token,
475        interests: Interest,
476    ) -> io::Result<()> {
477        self.inner.register(registry, token, interests)
478    }
479
480    fn reregister(
481        &mut self,
482        registry: &Registry,
483        token: Token,
484        interests: Interest,
485    ) -> io::Result<()> {
486        self.inner.reregister(registry, token, interests)
487    }
488
489    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
490        self.inner.deregister(registry)
491    }
492}
493
494impl Read for Receiver {
495    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
496        self.inner.do_io(|mut sender| sender.read(buf))
497    }
498
499    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
500        self.inner.do_io(|mut sender| sender.read_vectored(bufs))
501    }
502}
503
504impl Read for &Receiver {
505    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
506        self.inner.do_io(|mut sender| sender.read(buf))
507    }
508
509    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
510        self.inner.do_io(|mut sender| sender.read_vectored(bufs))
511    }
512}
513
514/// # Notes
515///
516/// The underlying pipe is **not** set to non-blocking.
517impl From<ChildStdout> for Receiver {
518    fn from(stdout: ChildStdout) -> Receiver {
519        // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
520        unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
521    }
522}
523
524/// # Notes
525///
526/// The underlying pipe is **not** set to non-blocking.
527impl From<ChildStderr> for Receiver {
528    fn from(stderr: ChildStderr) -> Receiver {
529        // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
530        unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
531    }
532}
533
534impl IntoRawFd for Receiver {
535    fn into_raw_fd(self) -> RawFd {
536        self.inner.into_inner().into_raw_fd()
537    }
538}
539
540impl AsRawFd for Receiver {
541    fn as_raw_fd(&self) -> RawFd {
542        self.inner.as_raw_fd()
543    }
544}
545
546impl FromRawFd for Receiver {
547    unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
548        Receiver {
549            inner: IoSource::new(File::from_raw_fd(fd)),
550        }
551    }
552}
553
554impl AsFd for Receiver {
555    fn as_fd(&self) -> BorrowedFd<'_> {
556        self.inner.as_fd()
557    }
558}
559
560#[cfg(not(any(target_os = "illumos", target_os = "solaris", target_os = "vita")))]
561fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
562    let value = nonblocking as libc::c_int;
563    if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
564        Err(io::Error::last_os_error())
565    } else {
566        Ok(())
567    }
568}
569
570#[cfg(any(target_os = "illumos", target_os = "solaris", target_os = "vita"))]
571fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
572    let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
573    if flags < 0 {
574        return Err(io::Error::last_os_error());
575    }
576
577    let nflags = if nonblocking {
578        flags | libc::O_NONBLOCK
579    } else {
580        flags & !libc::O_NONBLOCK
581    };
582
583    if flags != nflags {
584        if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
585            return Err(io::Error::last_os_error());
586        }
587    }
588
589    Ok(())
590}
591} // `cfg_os_ext!`.