1 // Copyright 2022 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::io; 6 use std::io::Write; 7 use std::thread; 8 use std::thread::JoinHandle; 9 use std::time::Duration; 10 11 use base::error; 12 use base::named_pipes::PipeConnection; 13 use base::AsRawDescriptor; 14 use base::Event; 15 use base::EventToken; 16 use base::FileSync; 17 use base::RawDescriptor; 18 use base::Result; 19 use base::TimerTrait; 20 use base::WaitContext; 21 use hypervisor::ProtectionType; 22 use winapi::um::ioapiset::CancelIoEx; 23 24 use crate::bus::BusDevice; 25 use crate::serial_device::SerialInput; 26 use crate::serial_device::SerialOptions; 27 use crate::sys::serial_device::SerialDevice; 28 use crate::Serial; 29 30 // TODO(b/234469655): Remove type alias once ReadNotifier is implemented for 31 // PipeConnection. 32 pub(crate) type InStreamType = Box<PipeConnection>; 33 34 /// Windows specific paramters for the serial device. 35 pub struct SystemSerialParams { 36 pub in_stream: Option<InStreamType>, 37 pub sync: Option<Box<dyn FileSync + Send>>, 38 pub sync_thread: Option<JoinHandle<SyncWorker>>, 39 pub kill_evt: Option<Event>, 40 } 41 42 impl Serial { 43 // Spawn the worker thread if it hasn't been spawned yet. handle_sync_thread(&mut self)44 pub(in crate::serial) fn handle_sync_thread(&mut self) { 45 if self.system_params.sync.is_some() { 46 let sync = match self.system_params.sync.take() { 47 Some(sync) => sync, 48 None => return, 49 }; 50 51 let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) 52 { 53 Ok(v) => v, 54 Err(e) => { 55 error!("failed creating kill Event pair: {}", e); 56 return; 57 } 58 }; 59 self.system_params.kill_evt = Some(self_kill_evt); 60 61 let thread_result = thread::Builder::new() 62 .name(format!("{} sync thread", self.debug_label())) 63 .spawn(move || { 64 let mut worker = SyncWorker { 65 kill_evt, 66 file: sync, 67 }; 68 worker.run(); 69 worker 70 }); 71 72 match thread_result { 73 Err(e) => { 74 error!("failed to spawn sync thread: {}", e); 75 } 76 Ok(sync_thread) => self.system_params.sync_thread = Some(sync_thread), 77 }; 78 } 79 } 80 } 81 82 impl SerialDevice for Serial { 83 /// Constructs a Serial device ready for input and output. 84 /// 85 /// The stream `input` should not block, instead returning 0 bytes if are no bytes available. new( _protection_type: ProtectionType, interrupt_evt: Event, input: Option<Box<dyn SerialInput>>, out: Option<Box<dyn io::Write + Send>>, sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> Serial86 fn new( 87 _protection_type: ProtectionType, 88 interrupt_evt: Event, 89 input: Option<Box<dyn SerialInput>>, 90 out: Option<Box<dyn io::Write + Send>>, 91 sync: Option<Box<dyn FileSync + Send>>, 92 options: SerialOptions, 93 _keep_rds: Vec<RawDescriptor>, 94 ) -> Serial { 95 let system_params = SystemSerialParams { 96 in_stream: None, 97 sync, 98 sync_thread: None, 99 kill_evt: None, 100 }; 101 Serial::new_common( 102 interrupt_evt, 103 input, 104 out, 105 options.out_timestamp, 106 system_params, 107 ) 108 } 109 110 /// Constructs a Serial device connected to a named pipe for I/O 111 /// 112 /// pipe_in and pipe_out should both refer to the same end of the same pipe, but may have 113 /// different underlying descriptors. new_with_pipe( _protection_type: ProtectionType, interrupt_evt: Event, pipe_in: PipeConnection, pipe_out: PipeConnection, _options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> Serial114 fn new_with_pipe( 115 _protection_type: ProtectionType, 116 interrupt_evt: Event, 117 pipe_in: PipeConnection, 118 pipe_out: PipeConnection, 119 _options: SerialOptions, 120 _keep_rds: Vec<RawDescriptor>, 121 ) -> Serial { 122 let system_params = SystemSerialParams { 123 in_stream: Some(Box::new(pipe_in)), 124 sync: None, 125 sync_thread: None, 126 kill_evt: None, 127 }; 128 let out_timestamp = false; 129 Serial::new_common( 130 interrupt_evt, 131 None, 132 Some(Box::new(pipe_out)), 133 out_timestamp, 134 system_params, 135 ) 136 } 137 } 138 139 impl Drop for Serial { drop(&mut self)140 fn drop(&mut self) { 141 if let Some(kill_evt) = self.system_params.kill_evt.take() { 142 // Ignore the result because there is nothing we can do about it. 143 let _ = kill_evt.signal(); 144 } 145 146 // TODO: only do this if serial stdin is enabled? 147 // SAFETY: We pass a valid file descriptor to `CancelIoEx`. 148 unsafe { 149 CancelIoEx(std::io::stdin().as_raw_descriptor(), std::ptr::null_mut()); 150 } 151 152 if let Some(sync_thread) = self.system_params.sync_thread.take() { 153 let _ = sync_thread.join(); 154 } 155 } 156 } 157 158 /// Worker to help with flusing contents of `file` to disk. 159 pub struct SyncWorker { 160 kill_evt: Event, 161 file: Box<dyn FileSync + Send>, 162 } 163 164 impl SyncWorker { run(&mut self)165 pub(in crate::serial) fn run(&mut self) { 166 let mut timer = match base::Timer::new() { 167 Err(e) => { 168 error!("failed to create timer for SyncWorker: {}", e); 169 return; 170 } 171 Ok(timer) => timer, 172 }; 173 174 if let Err(e) = timer.reset_repeating(Duration::from_secs(1)) { 175 error!("failed to set timer for SyncWorker: {}", e); 176 return; 177 } 178 179 #[derive(EventToken)] 180 enum Token { 181 Sync, 182 Kill, 183 } 184 185 let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[ 186 (&timer, Token::Sync), 187 (&self.kill_evt, Token::Kill), 188 ]) { 189 Ok(ec) => ec, 190 Err(e) => { 191 error!("failed creating WaitContext: {}", e); 192 return; 193 } 194 }; 195 loop { 196 let events = match wait_ctx.wait() { 197 Ok(v) => v, 198 Err(e) => { 199 error!("failed polling for events: {}", e); 200 return; 201 } 202 }; 203 204 for event in events.iter().filter(|e| e.is_readable) { 205 match event.token { 206 Token::Sync => { 207 timer.mark_waited().unwrap(); 208 if let Err(e) = self.file.fsync() { 209 error!("failed to fsync serial device, stopping sync thread: {}", e); 210 return; 211 } 212 } 213 Token::Kill => { 214 if let Err(e) = self.file.fsync() { 215 error!("failed to fsync serial device, stopping sync thread: {}", e); 216 return; 217 } 218 return; 219 } 220 } 221 } 222 } 223 } 224 } 225 226 #[cfg(test)] 227 mod tests { 228 use super::*; 229 use crate::serial::tests::*; 230 use crate::serial::*; 231 232 #[cfg(windows)] 233 #[test] named_pipe()234 fn named_pipe() { 235 use base::named_pipes; 236 use base::named_pipes::BlockingMode; 237 use base::named_pipes::FramingMode; 238 use rand::Rng; 239 240 let path_str = format!(r"\\.\pipe\crosvm_test_{}", rand::thread_rng().gen::<u64>()); 241 242 let pipe_in = named_pipes::create_server_pipe( 243 &path_str, 244 &FramingMode::Byte, 245 &BlockingMode::NoWait, 246 0, // default timeout 247 named_pipes::DEFAULT_BUFFER_SIZE, 248 false, 249 ) 250 .unwrap(); 251 252 let pipe_out = pipe_in.try_clone().unwrap(); 253 let event = Event::new().unwrap(); 254 255 let mut device = Serial::new_with_pipe( 256 ProtectionType::Unprotected, 257 event, 258 pipe_in, 259 pipe_out, 260 Default::default(), 261 Vec::new(), 262 ); 263 264 let client_pipe = named_pipes::create_client_pipe( 265 &path_str, 266 &FramingMode::Byte, 267 &BlockingMode::Wait, 268 false, 269 ) 270 .unwrap(); 271 272 // TODO(b/315998194): Add safety comment 273 #[allow(clippy::undocumented_unsafe_blocks)] 274 unsafe { 275 // Check that serial output is sent to the pipe 276 device.write(serial_bus_address(DATA), b"T"); 277 device.write(serial_bus_address(DATA), b"D"); 278 279 let mut read_buf: [u8; 2] = [0; 2]; 280 281 assert_eq!(client_pipe.read(&mut read_buf).unwrap(), 2); 282 assert_eq!(read_buf, [b'T', b'D']); 283 284 // Check that pipe_in is the other end of client_pipe. It's not actually wired up to 285 // SerialInput in this file so we can't test the data flow 286 client_pipe 287 .write(&[1, 2]) 288 .expect("Failed to write to client pipe"); 289 assert_eq!( 290 device 291 .system_params 292 .in_stream 293 .as_mut() 294 .unwrap() 295 .read(&mut read_buf) 296 .unwrap(), 297 2 298 ); 299 assert_eq!(read_buf, [1, 2]); 300 } 301 } 302 } 303