1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::io;
6 use std::sync::Arc;
7
8 use log::error;
9 use log::warn;
10 use serde::ser::SerializeStruct;
11 use serde::Deserialize;
12 use serde::Serialize;
13 use serde::Serializer;
14 use sync::Mutex;
15
16 use super::named_pipes;
17 use super::named_pipes::PipeConnection;
18 use super::MultiProcessMutex;
19 use super::RawDescriptor;
20 use super::Result;
21 use crate::descriptor::AsRawDescriptor;
22 use crate::CloseNotifier;
23 use crate::Event;
24 use crate::ReadNotifier;
25
26 #[derive(Copy, Clone)]
27 pub enum FramingMode {
28 Message,
29 Byte,
30 }
31
32 #[derive(Copy, Clone, PartialEq, Eq)]
33 pub enum BlockingMode {
34 Blocking,
35 Nonblocking,
36 }
37
38 impl From<FramingMode> for named_pipes::FramingMode {
from(framing_mode: FramingMode) -> Self39 fn from(framing_mode: FramingMode) -> Self {
40 match framing_mode {
41 FramingMode::Message => named_pipes::FramingMode::Message,
42 FramingMode::Byte => named_pipes::FramingMode::Byte,
43 }
44 }
45 }
46
47 impl From<BlockingMode> for named_pipes::BlockingMode {
from(blocking_mode: BlockingMode) -> Self48 fn from(blocking_mode: BlockingMode) -> Self {
49 match blocking_mode {
50 BlockingMode::Blocking => named_pipes::BlockingMode::Wait,
51 BlockingMode::Nonblocking => named_pipes::BlockingMode::NoWait,
52 }
53 }
54 }
55
56 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
57
58 /// An abstraction over named pipes and unix socketpairs.
59 ///
60 /// The ReadNotifier will return an event handle that is set when data is in the channel.
61 ///
62 /// In message mode, single writes larger than
63 /// `crate::windows::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
64 ///
65 /// # Notes for maintainers
66 /// 1. This struct contains extremely subtle thread safety considerations.
67 /// 2. Serialization is not derived! New fields need to be added manually.
68 #[derive(Deserialize, Debug)]
69 pub struct StreamChannel {
70 pipe_conn: named_pipes::PipeConnection,
71 write_notify: Event,
72 read_notify: Event,
73 pipe_closed: Event,
74
75 // Held when reading on this end, to prevent additional writes from corrupting notification
76 // state.
77 remote_write_lock: MultiProcessMutex,
78
79 // Held when a write is made on this end, so that if the remote end is reading, we wait to
80 // write to avoid corrupting notification state.
81 local_write_lock: MultiProcessMutex,
82
83 // Held for the entire duration of a read. This enables the StreamChannel to be sync,
84 // ensuring there is no chance of concurrent reads creating a bad state in StreamChannel.
85 //
86 // In practice, there is no use-case for multiple threads actually contending over
87 // reading from a single pipe through StreamChannel, so this is mostly to provide a
88 // compiler guarantee while passing the StreamChannel to/from background executors.
89 //
90 // Note that this mutex does not work across processes, so the same StreamChannel end should
91 // NOT be concurrently used across process boundaries. (Odds are if you want to do this, it's
92 // not what you want. Wanting this means you want two readers on the *same end* of the pipe,
93 // which is not well defined behavior.)
94 #[serde(skip)]
95 #[serde(default = "create_read_lock")]
96 read_lock: Arc<Mutex<()>>,
97
98 // Serde only has an immutable reference. Because of that, we have to cheat to signal when this
99 // channel end has been serialized. Once serialized, we know that the current end MUST NOT
100 // signal the channel has been closed when it was dropped, because a copy of it was sent to
101 // another process. It is the copy's responsibility to close the pipe.
102 #[serde(skip)]
103 #[serde(default = "create_true_mutex")]
104 is_channel_closed_on_drop: Mutex<bool>,
105
106 // For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
107 // up to that size.
108 send_buffer_size: usize,
109 }
110
create_read_lock() -> Arc<Mutex<()>>111 fn create_read_lock() -> Arc<Mutex<()>> {
112 Arc::new(Mutex::new(()))
113 }
114
create_true_mutex() -> Mutex<bool>115 fn create_true_mutex() -> Mutex<bool> {
116 Mutex::new(true)
117 }
118
119 /// Serialize is manually implemented because we need to tell the local copy that a remote copy
120 /// exists, and to not send the close event. Our serialization is otherwise identical to what
121 /// derive would have generated.
122 impl Serialize for StreamChannel {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,123 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
124 where
125 S: Serializer,
126 {
127 let mut s = serializer.serialize_struct("StreamChannel", 7)?;
128 s.serialize_field("pipe_conn", &self.pipe_conn)?;
129 s.serialize_field("write_notify", &self.write_notify)?;
130 s.serialize_field("read_notify", &self.read_notify)?;
131 s.serialize_field("pipe_closed", &self.pipe_closed)?;
132 s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
133 s.serialize_field("local_write_lock", &self.local_write_lock)?;
134 s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
135 let ret = s.end();
136
137 // Because this end has been serialized, the serialized copy is now responsible for setting
138 // the close event.
139 if ret.is_ok() {
140 *self.is_channel_closed_on_drop.lock() = false;
141 }
142
143 ret
144 }
145 }
146
147 impl Drop for StreamChannel {
drop(&mut self)148 fn drop(&mut self) {
149 if *self.is_channel_closed_on_drop.lock() {
150 if let Err(e) = self.pipe_closed.signal() {
151 warn!("failed to notify on channel drop: {}", e);
152 }
153 }
154 }
155 }
156
157 impl StreamChannel {
set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()>158 pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
159 // Safe because the pipe is open.
160 if nonblocking {
161 self.pipe_conn
162 .set_blocking(&named_pipes::BlockingMode::NoWait)
163 } else {
164 self.pipe_conn
165 .set_blocking(&named_pipes::BlockingMode::Wait)
166 }
167 }
168
169 // WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
170 // > 1 reader per end is not defined.
try_clone(&self) -> io::Result<Self>171 pub fn try_clone(&self) -> io::Result<Self> {
172 Ok(StreamChannel {
173 pipe_conn: self.pipe_conn.try_clone()?,
174 write_notify: self.write_notify.try_clone()?,
175 read_notify: self.read_notify.try_clone()?,
176 pipe_closed: self.pipe_closed.try_clone()?,
177 remote_write_lock: self.remote_write_lock.try_clone()?,
178 local_write_lock: self.local_write_lock.try_clone()?,
179 read_lock: self.read_lock.clone(),
180 is_channel_closed_on_drop: create_true_mutex(),
181 send_buffer_size: self.send_buffer_size,
182 })
183 }
184
185 /// Gets the readable byte count. Returns zero for broken pipes since that will cause the read
186 /// notifier to be set, and for the consumer to quickly discover the broken pipe.
get_readable_byte_count(&self) -> io::Result<u32>187 fn get_readable_byte_count(&self) -> io::Result<u32> {
188 match self.pipe_conn.get_available_byte_count() {
189 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
190 Err(e) => {
191 error!("StreamChannel failed to get readable byte count: {}", e);
192 Err(e)
193 }
194 Ok(byte_count) => Ok(byte_count),
195 }
196 }
197
inner_read(&self, buf: &mut [u8]) -> io::Result<usize>198 pub(super) fn inner_read(&self, buf: &mut [u8]) -> io::Result<usize> {
199 // We ensure concurrent read safety by holding a lock for the duration of the method.
200 // (If multiple concurrent readers were permitted, the pipe could be emptied after we decide
201 // that the notifier should be set, leading to an invalid notified/readable state which
202 // could stall readers.)
203 let _read_lock = self.read_lock.lock();
204
205 // SAFETY:
206 // Safe because no partial reads are possible, and the underlying code bounds the
207 // read by buf's size.
208 let res = unsafe { self.pipe_conn.read(buf) };
209
210 // The entire goal of this complex section is to avoid the need for shared memory between
211 // each channel end to synchronize the notification state. It is very subtle, modify with
212 // care.
213 loop {
214 // No other thread is reading, so we can find out, without the write lock, whether or
215 // not we need to clear the read notifier. If we don't, then we don't even have to try
216 // acquiring the write lock. This avoids deadlocks where the pipe is full and the write
217 // side blocks on a writing with the lock held. If it looks like we do need to clear
218 // the notifier though, then we have to be sure, so we'll proceed to the next section.
219 let byte_count = self.get_readable_byte_count()?;
220 if byte_count > 0 {
221 // It's always safe to set the read notifier here because we know there is data in
222 // the pipe, and no one else could read it out from under us.
223 self.read_notify.signal().map_err(|e| {
224 io::Error::new(
225 io::ErrorKind::Other,
226 format!("failed to write to read notifier: {:?}", e),
227 )
228 })?;
229
230 // Notifier state has been safely synced.
231 return res;
232 }
233
234 // At this point, there *may* be no data in the pipe, meaning we may want to clear the
235 // notifier. Instead of just trying to acquire the lock outright which could deadlock
236 // with the writing side, we'll try with a timeout. If it fails, we know that the other
237 // side is in the middle of a write, so there either will be data in the pipe soon (a),
238 // or there won't be and we have to clear a spurious notification (b).
239 //
240 // For (a), we can safely return from the read without needing the lock, so we just come
241 // around in the loop to check again, and the loop will exit quickly.
242 //
243 // For (b) we'll return to this point and acquire the lock, as we're just waiting for
244 // the spurious notification to arrive so we can clear it (that code path is very fast),
245 // and the loop will exit.
246 //
247 // If we successfully acquire the lock though, then we can go ahead and clear the
248 // notifier if the pipe is indeed empty, because we are assured that no writes are
249 // happening (we hold the lock). Here, we wait up to 1ms to acquire the lock because
250 // that's a decent balance between avoiding an unnecessary iteration, and minimizing
251 // latency.
252 if let Some(_write_lock) = self.remote_write_lock.try_lock(/* timeout_ms= */ 1) {
253 let byte_count = self.get_readable_byte_count()?;
254 if byte_count > 0 {
255 // Safe because no one else can be reading from the pipe.
256 self.read_notify.signal().map_err(|e| {
257 io::Error::new(
258 io::ErrorKind::Other,
259 format!("failed to write to read notifier: {:?}", e),
260 )
261 })?;
262 } else {
263 // Safe because no other writes can be happening (_lock is held).
264 self.read_notify.reset().map_err(|e| {
265 io::Error::new(
266 io::ErrorKind::Other,
267 format!("failed to reset read notifier: {:?}", e),
268 )
269 })?;
270 }
271
272 // Notifier state has been safely synced.
273 return res;
274 }
275 }
276 }
277
278 /// Exists as a workaround for Tube which does not expect its transport to be mutable,
279 /// even though io::Write requires it.
write_immutable(&self, buf: &[u8]) -> io::Result<usize>280 pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
281 if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
282 && buf.len() > self.send_buffer_size
283 {
284 return Err(io::Error::new(
285 io::ErrorKind::Other,
286 format!(
287 "StreamChannel forbids message mode writes larger than the \
288 default buffer size of {}.",
289 self.send_buffer_size,
290 ),
291 ));
292 }
293
294 let _lock = self.local_write_lock.lock();
295 let res = self.pipe_conn.write(buf);
296
297 // We can always set the write notifier because we know that the reader is in one of the
298 // following states:
299 // 1) a read is running, and it consumes these bytes, so the notification is
300 // unnecessary. That's fine, because the reader will resync the notifier state once
301 // it finishes reading.
302 // 2) a read has completed and is blocked on the lock. The notification state is
303 // already correct, and the read's resync won't change that.
304 if res.is_ok() {
305 self.write_notify.signal().map_err(|e| {
306 io::Error::new(
307 io::ErrorKind::Other,
308 format!("failed to write to read notifier: {:?}", e),
309 )
310 })?;
311 }
312
313 res
314 }
315
316 /// This only works with empty pipes. U.B. will result if used in any other scenario.
from_pipes( pipe_a: PipeConnection, pipe_b: PipeConnection, send_buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>317 pub fn from_pipes(
318 pipe_a: PipeConnection,
319 pipe_b: PipeConnection,
320 send_buffer_size: usize,
321 ) -> Result<(StreamChannel, StreamChannel)> {
322 let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
323 let pipe_closed = Event::new()?;
324
325 let write_lock_a = MultiProcessMutex::new()?;
326 let write_lock_b = MultiProcessMutex::new()?;
327
328 let sock_a = StreamChannel {
329 pipe_conn: pipe_a,
330 write_notify: notify_a_write.try_clone()?,
331 read_notify: notify_b_write.try_clone()?,
332 read_lock: Arc::new(Mutex::new(())),
333 local_write_lock: write_lock_a.try_clone()?,
334 remote_write_lock: write_lock_b.try_clone()?,
335 pipe_closed: pipe_closed.try_clone()?,
336 is_channel_closed_on_drop: create_true_mutex(),
337 send_buffer_size,
338 };
339 let sock_b = StreamChannel {
340 pipe_conn: pipe_b,
341 write_notify: notify_b_write,
342 read_notify: notify_a_write,
343 read_lock: Arc::new(Mutex::new(())),
344 local_write_lock: write_lock_b,
345 remote_write_lock: write_lock_a,
346 pipe_closed,
347 is_channel_closed_on_drop: create_true_mutex(),
348 send_buffer_size,
349 };
350 Ok((sock_a, sock_b))
351 }
352
353 /// Create a pair with a specific buffer size. Note that this is the only way to send messages
354 /// larger than the default named pipe buffer size.
pair_with_buffer_size( blocking_mode: BlockingMode, framing_mode: FramingMode, buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>355 pub fn pair_with_buffer_size(
356 blocking_mode: BlockingMode,
357 framing_mode: FramingMode,
358 buffer_size: usize,
359 ) -> Result<(StreamChannel, StreamChannel)> {
360 let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
361 &named_pipes::FramingMode::from(framing_mode),
362 &named_pipes::BlockingMode::from(blocking_mode),
363 0,
364 buffer_size,
365 false,
366 )?;
367 Self::from_pipes(pipe_a, pipe_b, buffer_size)
368 }
369 /// Creates a cross platform channel pair.
370 /// On Windows the result is in the form (server, client).
pair( blocking_mode: BlockingMode, framing_mode: FramingMode, ) -> Result<(StreamChannel, StreamChannel)>371 pub fn pair(
372 blocking_mode: BlockingMode,
373 framing_mode: FramingMode,
374 ) -> Result<(StreamChannel, StreamChannel)> {
375 let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
376 &named_pipes::FramingMode::from(framing_mode),
377 &named_pipes::BlockingMode::from(blocking_mode),
378 0,
379 DEFAULT_BUFFER_SIZE,
380 false,
381 )?;
382 Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
383 }
384
385 /// Blocks until the pipe buffer is empty.
386 /// NOTE: that this will only work for server pipes on Windows.
flush_blocking(&self) -> io::Result<()>387 pub fn flush_blocking(&self) -> io::Result<()> {
388 self.pipe_conn.flush_data_blocking()
389 }
390
get_read_notifier_event(&self) -> &Event391 pub(crate) fn get_read_notifier_event(&self) -> &Event {
392 &self.read_notify
393 }
394
get_close_notifier_event(&self) -> &Event395 pub(crate) fn get_close_notifier_event(&self) -> &Event {
396 &self.pipe_closed
397 }
398 }
399
400 impl io::Write for StreamChannel {
write(&mut self, buf: &[u8]) -> io::Result<usize>401 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
402 self.write_immutable(buf)
403 }
flush(&mut self) -> io::Result<()>404 fn flush(&mut self) -> io::Result<()> {
405 // There is no userspace buffering inside crosvm to flush for named pipes. We write
406 // directly to the named pipe using WriteFile.
407 Ok(())
408 }
409 }
410
411 impl AsRawDescriptor for &StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor412 fn as_raw_descriptor(&self) -> RawDescriptor {
413 self.pipe_conn.as_raw_descriptor()
414 }
415 }
416
417 impl ReadNotifier for StreamChannel {
418 /// Returns a RawDescriptor that can be polled for reads using WaitContext.
get_read_notifier(&self) -> &dyn AsRawDescriptor419 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
420 &self.read_notify
421 }
422 }
423
424 impl CloseNotifier for StreamChannel {
get_close_notifier(&self) -> &dyn AsRawDescriptor425 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
426 &self.pipe_closed
427 }
428 }
429
430 impl io::Read for StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>431 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
432 self.inner_read(buf)
433 }
434 }
435
436 impl io::Read for &StreamChannel {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>437 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
438 self.inner_read(buf)
439 }
440 }
441
442 impl AsRawDescriptor for StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor443 fn as_raw_descriptor(&self) -> RawDescriptor {
444 (&self).as_raw_descriptor()
445 }
446 }
447
448 #[cfg(test)]
449 mod test {
450 use std::io::Read;
451 use std::io::Write;
452 use std::time::Duration;
453
454 use super::super::EventContext;
455 use super::super::EventTrigger;
456 use super::*;
457 use crate::EventToken;
458 use crate::ReadNotifier;
459
460 #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
461 enum Token {
462 ReceivedData,
463 }
464
465 const EVENT_WAIT_TIME: Duration = Duration::from_secs(10);
466
467 #[test]
test_read_notifies_multiple_writes()468 fn test_read_notifies_multiple_writes() {
469 let (mut sender, mut receiver) =
470 StreamChannel::pair(BlockingMode::Blocking, FramingMode::Byte).unwrap();
471 sender.write_all(&[1, 2]).unwrap();
472
473 // Wait for the write to arrive.
474 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
475 receiver.get_read_notifier(),
476 Token::ReceivedData,
477 )])
478 .unwrap();
479 assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
480
481 // Read just one byte. This leaves another byte in the pipe.
482 let mut recv_buffer = [0u8; 1];
483 let size = receiver.read(&mut recv_buffer).unwrap();
484 assert_eq!(size, 1);
485 assert_eq!(recv_buffer[0], 1);
486
487 // The notifier should still be set, because the pipe has data.
488 assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
489 let size = receiver.read(&mut recv_buffer).unwrap();
490 assert_eq!(size, 1);
491 assert_eq!(recv_buffer[0], 2);
492 }
493
494 #[test]
test_blocked_writer_wont_deadlock()495 fn test_blocked_writer_wont_deadlock() {
496 let (mut writer, mut reader) =
497 StreamChannel::pair_with_buffer_size(BlockingMode::Blocking, FramingMode::Byte, 100)
498 .unwrap();
499 const NUM_OPS: usize = 100;
500
501 // We set the buffer size to 100 bytes. It seems that we must exceed that buffer size by
502 // 100x before we run into a blocking write, so that's what we do here. This makes sense
503 // to a degree because the docs suggest that some automatic expansion of a pipe's buffers
504 // is supposed to be handled by the kernel.
505 let writer = std::thread::spawn(move || {
506 let buf = [0u8; 100];
507 for _ in 0..NUM_OPS {
508 assert_eq!(writer.write(&buf).unwrap(), buf.len());
509 }
510 writer
511 });
512
513 // The test passes if the reader can read (this used to deadlock).
514 let mut buf = [0u8; 100];
515 for _ in 0..NUM_OPS {
516 assert_eq!(reader.read(&mut buf).unwrap(), buf.len());
517 }
518
519 // Writer must exit cleanly.
520 writer.join().unwrap();
521 }
522
523 #[test]
test_non_blocking_pair()524 fn test_non_blocking_pair() {
525 let (mut sender, mut receiver) =
526 StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
527
528 sender.write_all(&[75, 77, 54, 82, 76, 65]).unwrap();
529
530 // Wait for the data to arrive.
531 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
532 receiver.get_read_notifier(),
533 Token::ReceivedData,
534 )])
535 .unwrap();
536 let events = event_ctx.wait().unwrap();
537 let tokens: Vec<Token> = events
538 .iter()
539 .filter(|e| e.is_readable)
540 .map(|e| e.token)
541 .collect();
542 assert_eq!(tokens, vec! {Token::ReceivedData});
543
544 // Smaller than what we sent so we get multiple chunks
545 let mut recv_buffer: [u8; 4] = [0; 4];
546
547 let mut size = receiver.read(&mut recv_buffer).unwrap();
548 assert_eq!(size, 4);
549 assert_eq!(recv_buffer, [75, 77, 54, 82]);
550
551 size = receiver.read(&mut recv_buffer).unwrap();
552 assert_eq!(size, 2);
553 assert_eq!(recv_buffer[0..2], [76, 65]);
554
555 // Now that we've polled for & received all data, polling again should show no events.
556 assert_eq!(
557 event_ctx
558 .wait_timeout(std::time::Duration::new(0, 0))
559 .unwrap()
560 .len(),
561 0
562 );
563 }
564
565 #[test]
test_non_blocking_pair_error_no_data()566 fn test_non_blocking_pair_error_no_data() {
567 let (mut sender, mut receiver) =
568 StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
569 receiver
570 .set_nonblocking(true)
571 .expect("Failed to set receiver to nonblocking mode.");
572
573 sender.write_all(&[75, 77]).unwrap();
574
575 // Wait for the data to arrive.
576 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
577 receiver.get_read_notifier(),
578 Token::ReceivedData,
579 )])
580 .unwrap();
581 let events = event_ctx.wait().unwrap();
582 let tokens: Vec<Token> = events
583 .iter()
584 .filter(|e| e.is_readable)
585 .map(|e| e.token)
586 .collect();
587 assert_eq!(tokens, vec! {Token::ReceivedData});
588
589 // We only read 2 bytes, even though we requested 4 bytes.
590 let mut recv_buffer: [u8; 4] = [0; 4];
591 let size = receiver.read(&mut recv_buffer).unwrap();
592 assert_eq!(size, 2);
593 assert_eq!(recv_buffer, [75, 77, 00, 00]);
594
595 // Further reads should encounter an error since there is no available data and this is a
596 // non blocking pipe.
597 assert!(receiver.read(&mut recv_buffer).is_err());
598 }
599 }
600