1 use cfg_if::cfg_if;
2 use std::str;
3 
4 use nix::errno::Errno;
5 use nix::mqueue::{
6     mq_attr_member_t, mq_close, mq_open, mq_receive, mq_send, mq_timedreceive,
7 };
8 use nix::mqueue::{MQ_OFlag, MqAttr};
9 use nix::sys::stat::Mode;
10 use nix::sys::time::{TimeSpec, TimeValLike};
11 use nix::time::{clock_gettime, ClockId};
12 
13 // Defined as a macro such that the error source is reported as the caller's location.
14 macro_rules! assert_attr_eq {
15     ($read_attr:ident, $initial_attr:ident) => {
16         cfg_if! {
17             if #[cfg(any(target_os = "dragonfly", target_os = "netbsd"))] {
18                 // NetBSD (and others which inherit its implementation) include other flags
19                 // in read_attr, such as those specified by oflag. Just make sure at least
20                 // the correct bits are set.
21                 assert_eq!($read_attr.flags() & $initial_attr.flags(), $initial_attr.flags());
22                 assert_eq!($read_attr.maxmsg(), $initial_attr.maxmsg());
23                 assert_eq!($read_attr.msgsize(), $initial_attr.msgsize());
24                 assert_eq!($read_attr.curmsgs(), $initial_attr.curmsgs());
25             } else {
26                 assert_eq!($read_attr, $initial_attr);
27             }
28         }
29     }
30 }
31 
32 #[test]
test_mq_send_and_receive()33 fn test_mq_send_and_receive() {
34     const MSG_SIZE: mq_attr_member_t = 32;
35     let attr = MqAttr::new(0, 10, MSG_SIZE, 0);
36     let mq_name = "/a_nix_test_queue";
37 
38     let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
39     let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
40     let r0 = mq_open(mq_name, oflag0, mode, Some(&attr));
41     if let Err(Errno::ENOSYS) = r0 {
42         println!("message queues not supported or module not loaded?");
43         return;
44     };
45     let mqd0 = r0.unwrap();
46     let msg_to_send = "msg_1";
47     mq_send(&mqd0, msg_to_send.as_bytes(), 1).unwrap();
48 
49     let oflag1 = MQ_OFlag::O_CREAT | MQ_OFlag::O_RDONLY;
50     let mqd1 = mq_open(mq_name, oflag1, mode, Some(&attr)).unwrap();
51     let mut buf = [0u8; 32];
52     let mut prio = 0u32;
53     let len = mq_receive(&mqd1, &mut buf, &mut prio).unwrap();
54     assert_eq!(prio, 1);
55 
56     mq_close(mqd1).unwrap();
57     mq_close(mqd0).unwrap();
58     assert_eq!(msg_to_send, str::from_utf8(&buf[0..len]).unwrap());
59 }
60 
61 #[test]
test_mq_timedreceive()62 fn test_mq_timedreceive() {
63     const MSG_SIZE: mq_attr_member_t = 32;
64     let attr = MqAttr::new(0, 10, MSG_SIZE, 0);
65     let mq_name = "/a_nix_test_queue";
66 
67     let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
68     let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
69     let r0 = mq_open(mq_name, oflag0, mode, Some(&attr));
70     if let Err(Errno::ENOSYS) = r0 {
71         println!("message queues not supported or module not loaded?");
72         return;
73     };
74     let mqd0 = r0.unwrap();
75     let msg_to_send = "msg_1";
76     mq_send(&mqd0, msg_to_send.as_bytes(), 1).unwrap();
77 
78     let oflag1 = MQ_OFlag::O_CREAT | MQ_OFlag::O_RDONLY;
79     let mqd1 = mq_open(mq_name, oflag1, mode, Some(&attr)).unwrap();
80     let mut buf = [0u8; 32];
81     let mut prio = 0u32;
82     let abstime =
83         clock_gettime(ClockId::CLOCK_REALTIME).unwrap() + TimeSpec::seconds(1);
84     let len = mq_timedreceive(&mqd1, &mut buf, &mut prio, &abstime).unwrap();
85     assert_eq!(prio, 1);
86 
87     mq_close(mqd1).unwrap();
88     mq_close(mqd0).unwrap();
89     assert_eq!(msg_to_send, str::from_utf8(&buf[0..len]).unwrap());
90 }
91 
92 #[test]
test_mq_getattr()93 fn test_mq_getattr() {
94     use nix::mqueue::mq_getattr;
95     const MSG_SIZE: mq_attr_member_t = 32;
96     let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0);
97     let mq_name = "/attr_test_get_attr";
98     let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
99     let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
100     let r = mq_open(mq_name, oflag, mode, Some(&initial_attr));
101     if let Err(Errno::ENOSYS) = r {
102         println!("message queues not supported or module not loaded?");
103         return;
104     };
105     let mqd = r.unwrap();
106 
107     let read_attr = mq_getattr(&mqd).unwrap();
108     assert_attr_eq!(read_attr, initial_attr);
109     mq_close(mqd).unwrap();
110 }
111 
112 // FIXME: Fix failures for mips in QEMU
113 #[test]
114 #[cfg_attr(
115     all(
116         qemu,
117         any(
118             target_arch = "mips",
119             target_arch = "mips32r6",
120             target_arch = "mips64",
121             target_arch = "mips64r6"
122         )
123     ),
124     ignore
125 )]
test_mq_setattr()126 fn test_mq_setattr() {
127     use nix::mqueue::{mq_getattr, mq_setattr};
128     const MSG_SIZE: mq_attr_member_t = 32;
129     let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0);
130     let mq_name = "/attr_test_get_attr";
131     let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
132     let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
133     let r = mq_open(mq_name, oflag, mode, Some(&initial_attr));
134     if let Err(Errno::ENOSYS) = r {
135         println!("message queues not supported or module not loaded?");
136         return;
137     };
138     let mqd = r.unwrap();
139 
140     let new_attr = MqAttr::new(0, 20, MSG_SIZE * 2, 100);
141     let old_attr = mq_setattr(&mqd, &new_attr).unwrap();
142     assert_attr_eq!(old_attr, initial_attr);
143 
144     // No changes here because according to the Linux man page only
145     // O_NONBLOCK can be set (see tests below)
146     #[cfg(not(any(target_os = "dragonfly", target_os = "netbsd")))]
147     {
148         let new_attr_get = mq_getattr(&mqd).unwrap();
149         assert_ne!(new_attr_get, new_attr);
150     }
151 
152     let new_attr_non_blocking = MqAttr::new(
153         MQ_OFlag::O_NONBLOCK.bits() as mq_attr_member_t,
154         10,
155         MSG_SIZE,
156         0,
157     );
158     mq_setattr(&mqd, &new_attr_non_blocking).unwrap();
159     let new_attr_get = mq_getattr(&mqd).unwrap();
160 
161     // now the O_NONBLOCK flag has been set
162     #[cfg(not(any(target_os = "dragonfly", target_os = "netbsd")))]
163     {
164         assert_ne!(new_attr_get, initial_attr);
165     }
166     assert_attr_eq!(new_attr_get, new_attr_non_blocking);
167     mq_close(mqd).unwrap();
168 }
169 
170 // FIXME: Fix failures for mips in QEMU
171 #[test]
172 #[cfg_attr(
173     all(
174         qemu,
175         any(
176             target_arch = "mips",
177             target_arch = "mips32r6",
178             target_arch = "mips64",
179             target_arch = "mips64r6"
180         )
181     ),
182     ignore
183 )]
test_mq_set_nonblocking()184 fn test_mq_set_nonblocking() {
185     use nix::mqueue::{mq_getattr, mq_remove_nonblock, mq_set_nonblock};
186     const MSG_SIZE: mq_attr_member_t = 32;
187     let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0);
188     let mq_name = "/attr_test_get_attr";
189     let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
190     let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
191     let r = mq_open(mq_name, oflag, mode, Some(&initial_attr));
192     if let Err(Errno::ENOSYS) = r {
193         println!("message queues not supported or module not loaded?");
194         return;
195     };
196     let mqd = r.unwrap();
197     mq_set_nonblock(&mqd).unwrap();
198     let new_attr = mq_getattr(&mqd);
199     let o_nonblock_bits = MQ_OFlag::O_NONBLOCK.bits() as mq_attr_member_t;
200     assert_eq!(new_attr.unwrap().flags() & o_nonblock_bits, o_nonblock_bits);
201     mq_remove_nonblock(&mqd).unwrap();
202     let new_attr = mq_getattr(&mqd);
203     assert_eq!(new_attr.unwrap().flags() & o_nonblock_bits, 0);
204     mq_close(mqd).unwrap();
205 }
206 
207 #[test]
test_mq_unlink()208 fn test_mq_unlink() {
209     use nix::mqueue::mq_unlink;
210     const MSG_SIZE: mq_attr_member_t = 32;
211     let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0);
212     let mq_name_opened = "/mq_unlink_test";
213     #[cfg(not(any(target_os = "dragonfly", target_os = "netbsd")))]
214     let mq_name_not_opened = "/mq_unlink_test";
215     let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
216     let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
217     let r = mq_open(mq_name_opened, oflag, mode, Some(&initial_attr));
218     if let Err(Errno::ENOSYS) = r {
219         println!("message queues not supported or module not loaded?");
220         return;
221     };
222     let mqd = r.unwrap();
223 
224     let res_unlink = mq_unlink(mq_name_opened);
225     assert_eq!(res_unlink, Ok(()));
226 
227     // NetBSD (and others which inherit its implementation) defer removing the message
228     // queue name until all references are closed, whereas Linux and others remove the
229     // message queue name immediately.
230     #[cfg(not(any(target_os = "dragonfly", target_os = "netbsd")))]
231     {
232         let res_unlink_not_opened = mq_unlink(mq_name_not_opened);
233         assert_eq!(res_unlink_not_opened, Err(Errno::ENOENT));
234     }
235 
236     mq_close(mqd).unwrap();
237     let res_unlink_after_close = mq_unlink(mq_name_opened);
238     assert_eq!(res_unlink_after_close, Err(Errno::ENOENT));
239 }
240