xref: /aosp_15_r20/external/crosvm/base/src/sys/windows/stream_channel.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io;
6 use std::sync::Arc;
7 
8 use log::error;
9 use log::warn;
10 use serde::ser::SerializeStruct;
11 use serde::Deserialize;
12 use serde::Serialize;
13 use serde::Serializer;
14 use sync::Mutex;
15 
16 use super::named_pipes;
17 use super::named_pipes::PipeConnection;
18 use super::MultiProcessMutex;
19 use super::RawDescriptor;
20 use super::Result;
21 use crate::descriptor::AsRawDescriptor;
22 use crate::CloseNotifier;
23 use crate::Event;
24 use crate::ReadNotifier;
25 
26 #[derive(Copy, Clone)]
27 pub enum FramingMode {
28     Message,
29     Byte,
30 }
31 
32 #[derive(Copy, Clone, PartialEq, Eq)]
33 pub enum BlockingMode {
34     Blocking,
35     Nonblocking,
36 }
37 
38 impl From<FramingMode> for named_pipes::FramingMode {
from(framing_mode: FramingMode) -> Self39     fn from(framing_mode: FramingMode) -> Self {
40         match framing_mode {
41             FramingMode::Message => named_pipes::FramingMode::Message,
42             FramingMode::Byte => named_pipes::FramingMode::Byte,
43         }
44     }
45 }
46 
47 impl From<BlockingMode> for named_pipes::BlockingMode {
from(blocking_mode: BlockingMode) -> Self48     fn from(blocking_mode: BlockingMode) -> Self {
49         match blocking_mode {
50             BlockingMode::Blocking => named_pipes::BlockingMode::Wait,
51             BlockingMode::Nonblocking => named_pipes::BlockingMode::NoWait,
52         }
53     }
54 }
55 
56 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
57 
58 /// An abstraction over named pipes and unix socketpairs.
59 ///
60 /// The ReadNotifier will return an event handle that is set when data is in the channel.
61 ///
62 /// In message mode, single writes larger than
63 /// `crate::windows::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
64 ///
65 /// # Notes for maintainers
66 /// 1. This struct contains extremely subtle thread safety considerations.
67 /// 2. Serialization is not derived! New fields need to be added manually.
68 #[derive(Deserialize, Debug)]
69 pub struct StreamChannel {
70     pipe_conn: named_pipes::PipeConnection,
71     write_notify: Event,
72     read_notify: Event,
73     pipe_closed: Event,
74 
75     // Held when reading on this end, to prevent additional writes from corrupting notification
76     // state.
77     remote_write_lock: MultiProcessMutex,
78 
79     // Held when a write is made on this end, so that if the remote end is reading, we wait to
80     // write to avoid corrupting notification state.
81     local_write_lock: MultiProcessMutex,
82 
83     // Held for the entire duration of a read. This enables the StreamChannel to be sync,
84     // ensuring there is no chance of concurrent reads creating a bad state in StreamChannel.
85     //
86     // In practice, there is no use-case for multiple threads actually contending over
87     // reading from a single pipe through StreamChannel, so this is mostly to provide a
88     // compiler guarantee while passing the StreamChannel to/from background executors.
89     //
90     // Note that this mutex does not work across processes, so the same StreamChannel end should
91     // NOT be concurrently used across process boundaries. (Odds are if you want to do this, it's
92     // not what you want. Wanting this means you want two readers on the *same end* of the pipe,
93     // which is not well defined behavior.)
94     #[serde(skip)]
95     #[serde(default = "create_read_lock")]
96     read_lock: Arc<Mutex<()>>,
97 
98     // Serde only has an immutable reference. Because of that, we have to cheat to signal when this
99     // channel end has been serialized. Once serialized, we know that the current end MUST NOT
100     // signal the channel has been closed when it was dropped, because a copy of it was sent to
101     // another process. It is the copy's responsibility to close the pipe.
102     #[serde(skip)]
103     #[serde(default = "create_true_mutex")]
104     is_channel_closed_on_drop: Mutex<bool>,
105 
106     // For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
107     // up to that size.
108     send_buffer_size: usize,
109 }
110 
create_read_lock() -> Arc<Mutex<()>>111 fn create_read_lock() -> Arc<Mutex<()>> {
112     Arc::new(Mutex::new(()))
113 }
114 
create_true_mutex() -> Mutex<bool>115 fn create_true_mutex() -> Mutex<bool> {
116     Mutex::new(true)
117 }
118 
119 /// Serialize is manually implemented because we need to tell the local copy that a remote copy
120 /// exists, and to not send the close event. Our serialization is otherwise identical to what
121 /// derive would have generated.
122 impl Serialize for StreamChannel {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,123     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
124     where
125         S: Serializer,
126     {
127         let mut s = serializer.serialize_struct("StreamChannel", 7)?;
128         s.serialize_field("pipe_conn", &self.pipe_conn)?;
129         s.serialize_field("write_notify", &self.write_notify)?;
130         s.serialize_field("read_notify", &self.read_notify)?;
131         s.serialize_field("pipe_closed", &self.pipe_closed)?;
132         s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
133         s.serialize_field("local_write_lock", &self.local_write_lock)?;
134         s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
135         let ret = s.end();
136 
137         // Because this end has been serialized, the serialized copy is now responsible for setting
138         // the close event.
139         if ret.is_ok() {
140             *self.is_channel_closed_on_drop.lock() = false;
141         }
142 
143         ret
144     }
145 }
146 
147 impl Drop for StreamChannel {
drop(&mut self)148     fn drop(&mut self) {
149         if *self.is_channel_closed_on_drop.lock() {
150             if let Err(e) = self.pipe_closed.signal() {
151                 warn!("failed to notify on channel drop: {}", e);
152             }
153         }
154     }
155 }
156 
157 impl StreamChannel {
set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()>158     pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
159         // Safe because the pipe is open.
160         if nonblocking {
161             self.pipe_conn
162                 .set_blocking(&named_pipes::BlockingMode::NoWait)
163         } else {
164             self.pipe_conn
165                 .set_blocking(&named_pipes::BlockingMode::Wait)
166         }
167     }
168 
169     // WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
170     // > 1 reader per end is not defined.
try_clone(&self) -> io::Result<Self>171     pub fn try_clone(&self) -> io::Result<Self> {
172         Ok(StreamChannel {
173             pipe_conn: self.pipe_conn.try_clone()?,
174             write_notify: self.write_notify.try_clone()?,
175             read_notify: self.read_notify.try_clone()?,
176             pipe_closed: self.pipe_closed.try_clone()?,
177             remote_write_lock: self.remote_write_lock.try_clone()?,
178             local_write_lock: self.local_write_lock.try_clone()?,
179             read_lock: self.read_lock.clone(),
180             is_channel_closed_on_drop: create_true_mutex(),
181             send_buffer_size: self.send_buffer_size,
182         })
183     }
184 
185     /// Gets the readable byte count. Returns zero for broken pipes since that will cause the read
186     /// notifier to be set, and for the consumer to quickly discover the broken pipe.
get_readable_byte_count(&self) -> io::Result<u32>187     fn get_readable_byte_count(&self) -> io::Result<u32> {
188         match self.pipe_conn.get_available_byte_count() {
189             Err(e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
190             Err(e) => {
191                 error!("StreamChannel failed to get readable byte count: {}", e);
192                 Err(e)
193             }
194             Ok(byte_count) => Ok(byte_count),
195         }
196     }
197 
inner_read(&self, buf: &mut [u8]) -> io::Result<usize>198     pub(super) fn inner_read(&self, buf: &mut [u8]) -> io::Result<usize> {
199         // We ensure concurrent read safety by holding a lock for the duration of the method.
200         // (If multiple concurrent readers were permitted, the pipe could be emptied after we decide
201         // that the notifier should be set, leading to an invalid notified/readable state which
202         // could stall readers.)
203         let _read_lock = self.read_lock.lock();
204 
205         // SAFETY:
206         // Safe because no partial reads are possible, and the underlying code bounds the
207         // read by buf's size.
208         let res = unsafe { self.pipe_conn.read(buf) };
209 
210         // The entire goal of this complex section is to avoid the need for shared memory between
211         // each channel end to synchronize the notification state. It is very subtle, modify with
212         // care.
213         loop {
214             // No other thread is reading, so we can find out, without the write lock, whether or
215             // not we need to clear the read notifier. If we don't, then we don't even have to try
216             // acquiring the write lock. This avoids deadlocks where the pipe is full and the write
217             // side blocks on a writing with the lock held. If it looks like we do need to clear
218             // the notifier though, then we have to be sure, so we'll proceed to the next section.
219             let byte_count = self.get_readable_byte_count()?;
220             if byte_count > 0 {
221                 // It's always safe to set the read notifier here because we know there is data in
222                 // the pipe, and no one else could read it out from under us.
223                 self.read_notify.signal().map_err(|e| {
224                     io::Error::new(
225                         io::ErrorKind::Other,
226                         format!("failed to write to read notifier: {:?}", e),
227                     )
228                 })?;
229 
230                 // Notifier state has been safely synced.
231                 return res;
232             }
233 
234             // At this point, there *may* be no data in the pipe, meaning we may want to clear the
235             // notifier. Instead of just trying to acquire the lock outright which could deadlock
236             // with the writing side, we'll try with a timeout. If it fails, we know that the other
237             // side is in the middle of a write, so there either will be data in the pipe soon (a),
238             // or there won't be and we have to clear a spurious notification (b).
239             //
240             // For (a), we can safely return from the read without needing the lock, so we just come
241             // around in the loop to check again, and the loop will exit quickly.
242             //
243             // For (b) we'll return to this point and acquire the lock, as we're just waiting for
244             // the spurious notification to arrive so we can clear it (that code path is very fast),
245             // and the loop will exit.
246             //
247             // If we successfully acquire the lock though, then we can go ahead and clear the
248             // notifier if the pipe is indeed empty, because we are assured that no writes are
249             // happening (we hold the lock). Here, we wait up to 1ms to acquire the lock because
250             // that's a decent balance between avoiding an unnecessary iteration, and minimizing
251             // latency.
252             if let Some(_write_lock) = self.remote_write_lock.try_lock(/* timeout_ms= */ 1) {
253                 let byte_count = self.get_readable_byte_count()?;
254                 if byte_count > 0 {
255                     // Safe because no one else can be reading from the pipe.
256                     self.read_notify.signal().map_err(|e| {
257                         io::Error::new(
258                             io::ErrorKind::Other,
259                             format!("failed to write to read notifier: {:?}", e),
260                         )
261                     })?;
262                 } else {
263                     // Safe because no other writes can be happening (_lock is held).
264                     self.read_notify.reset().map_err(|e| {
265                         io::Error::new(
266                             io::ErrorKind::Other,
267                             format!("failed to reset read notifier: {:?}", e),
268                         )
269                     })?;
270                 }
271 
272                 // Notifier state has been safely synced.
273                 return res;
274             }
275         }
276     }
277 
278     /// Exists as a workaround for Tube which does not expect its transport to be mutable,
279     /// even though io::Write requires it.
write_immutable(&self, buf: &[u8]) -> io::Result<usize>280     pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
281         if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
282             && buf.len() > self.send_buffer_size
283         {
284             return Err(io::Error::new(
285                 io::ErrorKind::Other,
286                 format!(
287                     "StreamChannel forbids message mode writes larger than the \
288                      default buffer size of {}.",
289                     self.send_buffer_size,
290                 ),
291             ));
292         }
293 
294         let _lock = self.local_write_lock.lock();
295         let res = self.pipe_conn.write(buf);
296 
297         // We can always set the write notifier because we know that the reader is in one of the
298         // following states:
299         //      1) a read is running, and it consumes these bytes, so the notification is
300         //         unnecessary. That's fine, because the reader will resync the notifier state once
301         //         it finishes reading.
302         //      2) a read has completed and is blocked on the lock. The notification state is
303         //         already correct, and the read's resync won't change that.
304         if res.is_ok() {
305             self.write_notify.signal().map_err(|e| {
306                 io::Error::new(
307                     io::ErrorKind::Other,
308                     format!("failed to write to read notifier: {:?}", e),
309                 )
310             })?;
311         }
312 
313         res
314     }
315 
316     /// This only works with empty pipes. U.B. will result if used in any other scenario.
from_pipes( pipe_a: PipeConnection, pipe_b: PipeConnection, send_buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>317     pub fn from_pipes(
318         pipe_a: PipeConnection,
319         pipe_b: PipeConnection,
320         send_buffer_size: usize,
321     ) -> Result<(StreamChannel, StreamChannel)> {
322         let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
323         let pipe_closed = Event::new()?;
324 
325         let write_lock_a = MultiProcessMutex::new()?;
326         let write_lock_b = MultiProcessMutex::new()?;
327 
328         let sock_a = StreamChannel {
329             pipe_conn: pipe_a,
330             write_notify: notify_a_write.try_clone()?,
331             read_notify: notify_b_write.try_clone()?,
332             read_lock: Arc::new(Mutex::new(())),
333             local_write_lock: write_lock_a.try_clone()?,
334             remote_write_lock: write_lock_b.try_clone()?,
335             pipe_closed: pipe_closed.try_clone()?,
336             is_channel_closed_on_drop: create_true_mutex(),
337             send_buffer_size,
338         };
339         let sock_b = StreamChannel {
340             pipe_conn: pipe_b,
341             write_notify: notify_b_write,
342             read_notify: notify_a_write,
343             read_lock: Arc::new(Mutex::new(())),
344             local_write_lock: write_lock_b,
345             remote_write_lock: write_lock_a,
346             pipe_closed,
347             is_channel_closed_on_drop: create_true_mutex(),
348             send_buffer_size,
349         };
350         Ok((sock_a, sock_b))
351     }
352 
353     /// Create a pair with a specific buffer size. Note that this is the only way to send messages
354     /// larger than the default named pipe buffer size.
pair_with_buffer_size( blocking_mode: BlockingMode, framing_mode: FramingMode, buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>355     pub fn pair_with_buffer_size(
356         blocking_mode: BlockingMode,
357         framing_mode: FramingMode,
358         buffer_size: usize,
359     ) -> Result<(StreamChannel, StreamChannel)> {
360         let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
361             &named_pipes::FramingMode::from(framing_mode),
362             &named_pipes::BlockingMode::from(blocking_mode),
363             0,
364             buffer_size,
365             false,
366         )?;
367         Self::from_pipes(pipe_a, pipe_b, buffer_size)
368     }
369     /// Creates a cross platform channel pair.
370     /// On Windows the result is in the form (server, client).
pair( blocking_mode: BlockingMode, framing_mode: FramingMode, ) -> Result<(StreamChannel, StreamChannel)>371     pub fn pair(
372         blocking_mode: BlockingMode,
373         framing_mode: FramingMode,
374     ) -> Result<(StreamChannel, StreamChannel)> {
375         let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
376             &named_pipes::FramingMode::from(framing_mode),
377             &named_pipes::BlockingMode::from(blocking_mode),
378             0,
379             DEFAULT_BUFFER_SIZE,
380             false,
381         )?;
382         Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
383     }
384 
385     /// Blocks until the pipe buffer is empty.
386     /// NOTE: that this will only work for server pipes on Windows.
flush_blocking(&self) -> io::Result<()>387     pub fn flush_blocking(&self) -> io::Result<()> {
388         self.pipe_conn.flush_data_blocking()
389     }
390 
get_read_notifier_event(&self) -> &Event391     pub(crate) fn get_read_notifier_event(&self) -> &Event {
392         &self.read_notify
393     }
394 
get_close_notifier_event(&self) -> &Event395     pub(crate) fn get_close_notifier_event(&self) -> &Event {
396         &self.pipe_closed
397     }
398 }
399 
400 impl io::Write for StreamChannel {
write(&mut self, buf: &[u8]) -> io::Result<usize>401     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
402         self.write_immutable(buf)
403     }
flush(&mut self) -> io::Result<()>404     fn flush(&mut self) -> io::Result<()> {
405         // There is no userspace buffering inside crosvm to flush for named pipes. We write
406         // directly to the named pipe using WriteFile.
407         Ok(())
408     }
409 }
410 
411 impl AsRawDescriptor for &StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor412     fn as_raw_descriptor(&self) -> RawDescriptor {
413         self.pipe_conn.as_raw_descriptor()
414     }
415 }
416 
417 impl ReadNotifier for StreamChannel {
418     /// Returns a RawDescriptor that can be polled for reads using WaitContext.
get_read_notifier(&self) -> &dyn AsRawDescriptor419     fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
420         &self.read_notify
421     }
422 }
423 
424 impl CloseNotifier for StreamChannel {
get_close_notifier(&self) -> &dyn AsRawDescriptor425     fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
426         &self.pipe_closed
427     }
428 }
429 
430 impl io::Read for StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>431     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
432         self.inner_read(buf)
433     }
434 }
435 
436 impl io::Read for &StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>437     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
438         self.inner_read(buf)
439     }
440 }
441 
442 impl AsRawDescriptor for StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor443     fn as_raw_descriptor(&self) -> RawDescriptor {
444         (&self).as_raw_descriptor()
445     }
446 }
447 
448 #[cfg(test)]
449 mod test {
450     use std::io::Read;
451     use std::io::Write;
452     use std::time::Duration;
453 
454     use super::super::EventContext;
455     use super::super::EventTrigger;
456     use super::*;
457     use crate::EventToken;
458     use crate::ReadNotifier;
459 
460     #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
461     enum Token {
462         ReceivedData,
463     }
464 
465     const EVENT_WAIT_TIME: Duration = Duration::from_secs(10);
466 
467     #[test]
test_read_notifies_multiple_writes()468     fn test_read_notifies_multiple_writes() {
469         let (mut sender, mut receiver) =
470             StreamChannel::pair(BlockingMode::Blocking, FramingMode::Byte).unwrap();
471         sender.write_all(&[1, 2]).unwrap();
472 
473         // Wait for the write to arrive.
474         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
475             receiver.get_read_notifier(),
476             Token::ReceivedData,
477         )])
478         .unwrap();
479         assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
480 
481         // Read just one byte. This leaves another byte in the pipe.
482         let mut recv_buffer = [0u8; 1];
483         let size = receiver.read(&mut recv_buffer).unwrap();
484         assert_eq!(size, 1);
485         assert_eq!(recv_buffer[0], 1);
486 
487         // The notifier should still be set, because the pipe has data.
488         assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
489         let size = receiver.read(&mut recv_buffer).unwrap();
490         assert_eq!(size, 1);
491         assert_eq!(recv_buffer[0], 2);
492     }
493 
494     #[test]
test_blocked_writer_wont_deadlock()495     fn test_blocked_writer_wont_deadlock() {
496         let (mut writer, mut reader) =
497             StreamChannel::pair_with_buffer_size(BlockingMode::Blocking, FramingMode::Byte, 100)
498                 .unwrap();
499         const NUM_OPS: usize = 100;
500 
501         // We set the buffer size to 100 bytes. It seems that we must exceed that buffer size by
502         // 100x before we run into a blocking write, so that's what we do here. This makes sense
503         // to a degree because the docs suggest that some automatic expansion of a pipe's buffers
504         // is supposed to be handled by the kernel.
505         let writer = std::thread::spawn(move || {
506             let buf = [0u8; 100];
507             for _ in 0..NUM_OPS {
508                 assert_eq!(writer.write(&buf).unwrap(), buf.len());
509             }
510             writer
511         });
512 
513         // The test passes if the reader can read (this used to deadlock).
514         let mut buf = [0u8; 100];
515         for _ in 0..NUM_OPS {
516             assert_eq!(reader.read(&mut buf).unwrap(), buf.len());
517         }
518 
519         // Writer must exit cleanly.
520         writer.join().unwrap();
521     }
522 
523     #[test]
test_non_blocking_pair()524     fn test_non_blocking_pair() {
525         let (mut sender, mut receiver) =
526             StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
527 
528         sender.write_all(&[75, 77, 54, 82, 76, 65]).unwrap();
529 
530         // Wait for the data to arrive.
531         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
532             receiver.get_read_notifier(),
533             Token::ReceivedData,
534         )])
535         .unwrap();
536         let events = event_ctx.wait().unwrap();
537         let tokens: Vec<Token> = events
538             .iter()
539             .filter(|e| e.is_readable)
540             .map(|e| e.token)
541             .collect();
542         assert_eq!(tokens, vec! {Token::ReceivedData});
543 
544         // Smaller than what we sent so we get multiple chunks
545         let mut recv_buffer: [u8; 4] = [0; 4];
546 
547         let mut size = receiver.read(&mut recv_buffer).unwrap();
548         assert_eq!(size, 4);
549         assert_eq!(recv_buffer, [75, 77, 54, 82]);
550 
551         size = receiver.read(&mut recv_buffer).unwrap();
552         assert_eq!(size, 2);
553         assert_eq!(recv_buffer[0..2], [76, 65]);
554 
555         // Now that we've polled for & received all data, polling again should show no events.
556         assert_eq!(
557             event_ctx
558                 .wait_timeout(std::time::Duration::new(0, 0))
559                 .unwrap()
560                 .len(),
561             0
562         );
563     }
564 
565     #[test]
test_non_blocking_pair_error_no_data()566     fn test_non_blocking_pair_error_no_data() {
567         let (mut sender, mut receiver) =
568             StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
569         receiver
570             .set_nonblocking(true)
571             .expect("Failed to set receiver to nonblocking mode.");
572 
573         sender.write_all(&[75, 77]).unwrap();
574 
575         // Wait for the data to arrive.
576         let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
577             receiver.get_read_notifier(),
578             Token::ReceivedData,
579         )])
580         .unwrap();
581         let events = event_ctx.wait().unwrap();
582         let tokens: Vec<Token> = events
583             .iter()
584             .filter(|e| e.is_readable)
585             .map(|e| e.token)
586             .collect();
587         assert_eq!(tokens, vec! {Token::ReceivedData});
588 
589         // We only read 2 bytes, even though we requested 4 bytes.
590         let mut recv_buffer: [u8; 4] = [0; 4];
591         let size = receiver.read(&mut recv_buffer).unwrap();
592         assert_eq!(size, 2);
593         assert_eq!(recv_buffer, [75, 77, 00, 00]);
594 
595         // Further reads should encounter an error since there is no available data and this is a
596         // non blocking pipe.
597         assert!(receiver.read(&mut recv_buffer).is_err());
598     }
599 }
600