1 //! Posix Message Queue functions
2 //!
3 //! # Example
4 //!
5 // no_run because a kernel module may be required.
6 //! ```no_run
7 //! # use std::ffi::CString;
8 //! # use nix::mqueue::*;
9 //! use nix::sys::stat::Mode;
10 //!
11 //! const MSG_SIZE: mq_attr_member_t = 32;
12 //! let mq_name= "/a_nix_test_queue";
13 //!
14 //! let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
15 //! let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
16 //! let mqd0 = mq_open(mq_name, oflag0, mode, None).unwrap();
17 //! let msg_to_send = b"msg_1";
18 //! mq_send(&mqd0, msg_to_send, 1).unwrap();
19 //!
20 //! let oflag1 = MQ_OFlag::O_CREAT | MQ_OFlag::O_RDONLY;
21 //! let mqd1 = mq_open(mq_name, oflag1, mode, None).unwrap();
22 //! let mut buf = [0u8; 32];
23 //! let mut prio = 0u32;
24 //! let len = mq_receive(&mqd1, &mut buf, &mut prio).unwrap();
25 //! assert_eq!(prio, 1);
26 //! assert_eq!(msg_to_send, &buf[0..len]);
27 //!
28 //! mq_close(mqd1).unwrap();
29 //! mq_close(mqd0).unwrap();
30 //! ```
31 //! [Further reading and details on the C API](https://man7.org/linux/man-pages/man7/mq_overview.7.html)
32 
33 use crate::errno::Errno;
34 use crate::NixPath;
35 use crate::Result;
36 
37 use crate::sys::stat::Mode;
38 use libc::{self, mqd_t, size_t};
39 use std::mem;
40 #[cfg(any(
41     target_os = "linux",
42     target_os = "netbsd",
43     target_os = "dragonfly"
44 ))]
45 use std::os::unix::io::{
46     AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd,
47 };
48 
49 libc_bitflags! {
50     /// Used with [`mq_open`].
51     pub struct MQ_OFlag: libc::c_int {
52         /// Open the message queue for receiving messages.
53         O_RDONLY;
54         /// Open the queue for sending messages.
55         O_WRONLY;
56         /// Open the queue for both receiving and sending messages
57         O_RDWR;
58         /// Create a message queue.
59         O_CREAT;
60         /// If set along with `O_CREAT`, `mq_open` will fail if the message
61         /// queue name exists.
62         O_EXCL;
63         /// `mq_send` and `mq_receive` should fail with `EAGAIN` rather than
64         /// wait for resources that are not currently available.
65         O_NONBLOCK;
66         /// Set the close-on-exec flag for the message queue descriptor.
67         O_CLOEXEC;
68     }
69 }
70 
71 /// A message-queue attribute, optionally used with [`mq_setattr`] and
72 /// [`mq_getattr`] and optionally [`mq_open`],
73 #[repr(C)]
74 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
75 pub struct MqAttr {
76     mq_attr: libc::mq_attr,
77 }
78 
79 /// Identifies an open POSIX Message Queue
80 // A safer wrapper around libc::mqd_t, which is a pointer on some platforms
81 // Deliberately is not Clone to prevent use-after-close scenarios
82 #[repr(transparent)]
83 #[derive(Debug)]
84 #[allow(missing_copy_implementations)]
85 pub struct MqdT(mqd_t);
86 
87 // x32 compatibility
88 // See https://sourceware.org/bugzilla/show_bug.cgi?id=21279
89 /// Size of a message queue attribute member
90 #[cfg(all(target_arch = "x86_64", target_pointer_width = "32"))]
91 pub type mq_attr_member_t = i64;
92 /// Size of a message queue attribute member
93 #[cfg(not(all(target_arch = "x86_64", target_pointer_width = "32")))]
94 pub type mq_attr_member_t = libc::c_long;
95 
96 impl MqAttr {
97     /// Create a new message queue attribute
98     ///
99     /// # Arguments
100     ///
101     /// - `mq_flags`:   Either `0` or `O_NONBLOCK`.
102     /// - `mq_maxmsg`:  Maximum number of messages on the queue.
103     /// - `mq_msgsize`: Maximum message size in bytes.
104     /// - `mq_curmsgs`: Number of messages currently in the queue.
new( mq_flags: mq_attr_member_t, mq_maxmsg: mq_attr_member_t, mq_msgsize: mq_attr_member_t, mq_curmsgs: mq_attr_member_t, ) -> MqAttr105     pub fn new(
106         mq_flags: mq_attr_member_t,
107         mq_maxmsg: mq_attr_member_t,
108         mq_msgsize: mq_attr_member_t,
109         mq_curmsgs: mq_attr_member_t,
110     ) -> MqAttr {
111         let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit();
112         unsafe {
113             let p = attr.as_mut_ptr();
114             (*p).mq_flags = mq_flags;
115             (*p).mq_maxmsg = mq_maxmsg;
116             (*p).mq_msgsize = mq_msgsize;
117             (*p).mq_curmsgs = mq_curmsgs;
118             MqAttr {
119                 mq_attr: attr.assume_init(),
120             }
121         }
122     }
123 
124     /// The current flags, either `0` or `O_NONBLOCK`.
flags(&self) -> mq_attr_member_t125     pub const fn flags(&self) -> mq_attr_member_t {
126         self.mq_attr.mq_flags
127     }
128 
129     /// The max number of messages that can be held by the queue
maxmsg(&self) -> mq_attr_member_t130     pub const fn maxmsg(&self) -> mq_attr_member_t {
131         self.mq_attr.mq_maxmsg
132     }
133 
134     /// The maximum size of each message (in bytes)
msgsize(&self) -> mq_attr_member_t135     pub const fn msgsize(&self) -> mq_attr_member_t {
136         self.mq_attr.mq_msgsize
137     }
138 
139     /// The number of messages currently held in the queue
curmsgs(&self) -> mq_attr_member_t140     pub const fn curmsgs(&self) -> mq_attr_member_t {
141         self.mq_attr.mq_curmsgs
142     }
143 }
144 
145 /// Open a message queue
146 ///
147 /// See also [`mq_open(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_open.html)
148 // The mode.bits() cast is only lossless on some OSes
149 #[allow(clippy::cast_lossless)]
mq_open<P>( name: &P, oflag: MQ_OFlag, mode: Mode, attr: Option<&MqAttr>, ) -> Result<MqdT> where P: ?Sized + NixPath,150 pub fn mq_open<P>(
151     name: &P,
152     oflag: MQ_OFlag,
153     mode: Mode,
154     attr: Option<&MqAttr>,
155 ) -> Result<MqdT>
156 where
157     P: ?Sized + NixPath,
158 {
159     let res = name.with_nix_path(|cstr| match attr {
160         Some(mq_attr) => unsafe {
161             libc::mq_open(
162                 cstr.as_ptr(),
163                 oflag.bits(),
164                 mode.bits() as libc::c_int,
165                 &mq_attr.mq_attr as *const libc::mq_attr,
166             )
167         },
168         None => unsafe { libc::mq_open(cstr.as_ptr(), oflag.bits()) },
169     })?;
170 
171     Errno::result(res).map(MqdT)
172 }
173 
174 /// Remove a message queue
175 ///
176 /// See also [`mq_unlink(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_unlink.html)
mq_unlink<P>(name: &P) -> Result<()> where P: ?Sized + NixPath,177 pub fn mq_unlink<P>(name: &P) -> Result<()>
178 where
179     P: ?Sized + NixPath,
180 {
181     let res =
182         name.with_nix_path(|cstr| unsafe { libc::mq_unlink(cstr.as_ptr()) })?;
183     Errno::result(res).map(drop)
184 }
185 
186 /// Close a message queue
187 ///
188 /// See also [`mq_close(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_close.html)
mq_close(mqdes: MqdT) -> Result<()>189 pub fn mq_close(mqdes: MqdT) -> Result<()> {
190     let res = unsafe { libc::mq_close(mqdes.0) };
191     Errno::result(res).map(drop)
192 }
193 
194 /// Receive a message from a message queue
195 ///
196 /// See also [`mq_receive(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_receive.html)
mq_receive( mqdes: &MqdT, message: &mut [u8], msg_prio: &mut u32, ) -> Result<usize>197 pub fn mq_receive(
198     mqdes: &MqdT,
199     message: &mut [u8],
200     msg_prio: &mut u32,
201 ) -> Result<usize> {
202     let len = message.len() as size_t;
203     let res = unsafe {
204         libc::mq_receive(
205             mqdes.0,
206             message.as_mut_ptr().cast(),
207             len,
208             msg_prio as *mut u32,
209         )
210     };
211     Errno::result(res).map(|r| r as usize)
212 }
213 
214 feature! {
215     #![feature = "time"]
216     use crate::sys::time::TimeSpec;
217     /// Receive a message from a message queue with a timeout
218     ///
219     /// See also ['mq_timedreceive(2)'](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_receive.html)
220     pub fn mq_timedreceive(
221         mqdes: &MqdT,
222         message: &mut [u8],
223         msg_prio: &mut u32,
224         abstime: &TimeSpec,
225     ) -> Result<usize> {
226         let len = message.len() as size_t;
227         let res = unsafe {
228             libc::mq_timedreceive(
229                 mqdes.0,
230                 message.as_mut_ptr().cast(),
231                 len,
232                 msg_prio as *mut u32,
233                 abstime.as_ref(),
234             )
235         };
236         Errno::result(res).map(|r| r as usize)
237     }
238 }
239 
240 /// Send a message to a message queue
241 ///
242 /// See also [`mq_send(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_send.html)
mq_send(mqdes: &MqdT, message: &[u8], msq_prio: u32) -> Result<()>243 pub fn mq_send(mqdes: &MqdT, message: &[u8], msq_prio: u32) -> Result<()> {
244     let res = unsafe {
245         libc::mq_send(mqdes.0, message.as_ptr().cast(), message.len(), msq_prio)
246     };
247     Errno::result(res).map(drop)
248 }
249 
250 /// Get message queue attributes
251 ///
252 /// See also [`mq_getattr(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_getattr.html)
mq_getattr(mqd: &MqdT) -> Result<MqAttr>253 pub fn mq_getattr(mqd: &MqdT) -> Result<MqAttr> {
254     let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit();
255     let res = unsafe { libc::mq_getattr(mqd.0, attr.as_mut_ptr()) };
256     Errno::result(res).map(|_| unsafe {
257         MqAttr {
258             mq_attr: attr.assume_init(),
259         }
260     })
261 }
262 
263 /// Set the attributes of the message queue. Only `O_NONBLOCK` can be set, everything else will be ignored
264 /// Returns the old attributes
265 /// It is recommend to use the `mq_set_nonblock()` and `mq_remove_nonblock()` convenience functions as they are easier to use
266 ///
267 /// [Further reading](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_setattr.html)
mq_setattr(mqd: &MqdT, newattr: &MqAttr) -> Result<MqAttr>268 pub fn mq_setattr(mqd: &MqdT, newattr: &MqAttr) -> Result<MqAttr> {
269     let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit();
270     let res = unsafe {
271         libc::mq_setattr(
272             mqd.0,
273             &newattr.mq_attr as *const libc::mq_attr,
274             attr.as_mut_ptr(),
275         )
276     };
277     Errno::result(res).map(|_| unsafe {
278         MqAttr {
279             mq_attr: attr.assume_init(),
280         }
281     })
282 }
283 
284 /// Convenience function.
285 /// Sets the `O_NONBLOCK` attribute for a given message queue descriptor
286 /// Returns the old attributes
287 #[allow(clippy::useless_conversion)] // Not useless on all OSes
mq_set_nonblock(mqd: &MqdT) -> Result<MqAttr>288 pub fn mq_set_nonblock(mqd: &MqdT) -> Result<MqAttr> {
289     let oldattr = mq_getattr(mqd)?;
290     let newattr = MqAttr::new(
291         mq_attr_member_t::from(MQ_OFlag::O_NONBLOCK.bits()),
292         oldattr.mq_attr.mq_maxmsg,
293         oldattr.mq_attr.mq_msgsize,
294         oldattr.mq_attr.mq_curmsgs,
295     );
296     mq_setattr(mqd, &newattr)
297 }
298 
299 /// Convenience function.
300 /// Removes `O_NONBLOCK` attribute for a given message queue descriptor
301 /// Returns the old attributes
mq_remove_nonblock(mqd: &MqdT) -> Result<MqAttr>302 pub fn mq_remove_nonblock(mqd: &MqdT) -> Result<MqAttr> {
303     let oldattr = mq_getattr(mqd)?;
304     let newattr = MqAttr::new(
305         0,
306         oldattr.mq_attr.mq_maxmsg,
307         oldattr.mq_attr.mq_msgsize,
308         oldattr.mq_attr.mq_curmsgs,
309     );
310     mq_setattr(mqd, &newattr)
311 }
312 
313 #[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))]
314 impl AsFd for MqdT {
315     /// Borrow the underlying message queue descriptor.
as_fd(&self) -> BorrowedFd316     fn as_fd(&self) -> BorrowedFd {
317         // SAFETY: [MqdT] will only contain a valid fd by construction.
318         unsafe { BorrowedFd::borrow_raw(self.0) }
319     }
320 }
321 
322 #[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))]
323 impl AsRawFd for MqdT {
324     /// Return the underlying message queue descriptor.
325     ///
326     /// Returned descriptor is a "shallow copy" of the descriptor, so it refers
327     ///  to the same underlying kernel object as `self`.
as_raw_fd(&self) -> RawFd328     fn as_raw_fd(&self) -> RawFd {
329         self.0
330     }
331 }
332 
333 #[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))]
334 impl FromRawFd for MqdT {
335     /// Construct an [MqdT] from [RawFd].
336     ///
337     /// # Safety
338     /// The `fd` given must be a valid and open file descriptor for a message
339     ///  queue.
from_raw_fd(fd: RawFd) -> MqdT340     unsafe fn from_raw_fd(fd: RawFd) -> MqdT {
341         MqdT(fd)
342     }
343 }
344 
345 #[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))]
346 impl IntoRawFd for MqdT {
347     /// Consume this [MqdT] and return a [RawFd].
into_raw_fd(self) -> RawFd348     fn into_raw_fd(self) -> RawFd {
349         self.0
350     }
351 }
352