1 // Copyright 2023 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 //! Implements the CrosVM control socket on Windows. Unlike on unix, this is a bit involved because 6 //! we can't process the raw named pipe in line inside `run_control` (named pipes aren't directly 7 //! waitable). In theory, AF_UNIX can be made waitable, but AF_UNIX is very slow, and we already 8 //! have significant prior art for using named pipes in a waitable fashion (`base::StreamChannel`). 9 10 use std::io; 11 use std::sync::mpsc; 12 use std::sync::Arc; 13 14 use base::named_pipes; 15 use base::named_pipes::OverlappedWrapper; 16 use base::named_pipes::PipeConnection; 17 use base::BlockingMode; 18 use base::Event; 19 use base::EventExt; 20 use base::EventToken; 21 use base::FlushOnDropTube; 22 use base::FramingMode; 23 use base::ReadNotifier; 24 use base::RecvTube; 25 use base::SendTube; 26 use base::StreamChannel; 27 use base::Tube; 28 use base::TubeError; 29 use base::WaitContext; 30 use base::WorkerThread; 31 use libc::EIO; 32 use log::error; 33 use log::info; 34 use log::warn; 35 use sync::Mutex; 36 use vm_control::VmRequest; 37 use vm_control::VmResponse; 38 use winapi::shared::winerror::ERROR_MORE_DATA; 39 40 /// Windows named pipes don't fit in well with the control loop (`run_control`) the way sockets do 41 /// on unix, so this struct provides a compatibility layer (named pipe server) that functions very 42 /// similarly to how a socket server would on unix. 43 /// 44 /// Terminology: 45 /// * The `ControlServer` is a socket server compatibility layer. 46 /// * The "control loop" is the VMM's main loop (`run_control`). It uses the `ControlServer` to 47 /// accept & service connections from clients that want to control the VMM (e.g. press the 48 /// power button, etc). 49 pub struct ControlServer { 50 server_listener_worker: WorkerThread<(io::Result<()>, ClientWorker)>, 51 /// Signaled when a client has connected and can be accepted without blocking. 52 client_waiting: Event, 53 /// Provides the accepted Tube every time a client connects. 54 client_tube_channel: mpsc::Receiver<FlushOnDropTube>, 55 } 56 57 #[derive(EventToken)] 58 enum Token { 59 Exit, 60 Readable, 61 } 62 63 impl ControlServer { 64 /// Creates a named pipe server on `pipe_name` that forwards Tube messages between the connected 65 /// client on that pipe, and the Tube returned by `ControlServer::accept`. new(pipe_name: &str) -> anyhow::Result<Self>66 pub fn new(pipe_name: &str) -> anyhow::Result<Self> { 67 let client_pipe_read = named_pipes::create_server_pipe( 68 pipe_name, 69 &named_pipes::FramingMode::Message, 70 &named_pipes::BlockingMode::Wait, 71 /* timeout= */ 0, 72 /* buffer_size= */ 1024 * 1024, 73 /* overlapped= */ true, 74 )?; 75 let client_pipe_write = client_pipe_read.try_clone()?; 76 let mut client_worker = ClientWorker::new(client_pipe_write); 77 let client_waiting = Event::new_auto_reset()?; 78 let client_waiting_for_worker = client_waiting.try_clone()?; 79 let (client_tube_channel_send, client_tube_channel_recv) = mpsc::channel(); 80 81 Ok(Self { 82 server_listener_worker: WorkerThread::start("ctrl_srv_listen_loop", move |exit_evt| { 83 let res = Self::server_listener_loop( 84 exit_evt, 85 &mut client_worker, 86 client_waiting_for_worker, 87 client_tube_channel_send, 88 client_pipe_read, 89 ); 90 if let Err(e) = res.as_ref() { 91 error!("server_listener_worker failed with error: {:?}", e) 92 } 93 (res, client_worker) 94 }), 95 client_waiting, 96 client_tube_channel: client_tube_channel_recv, 97 }) 98 } 99 100 /// Gets the client waiting event. If a client is waiting, [ControlServer::accept] can be called 101 /// and will return a [base::Tube] without blocking. client_waiting(&self) -> &Event102 pub fn client_waiting(&self) -> &Event { 103 &self.client_waiting 104 } 105 106 /// Accepts a connection (if one is waiting), returning a [base::Tube] connected to the client. 107 /// If [ControlServer::client_waiting] has not been signaled, this will block until a client 108 /// connects. accept(&mut self) -> FlushOnDropTube109 pub fn accept(&mut self) -> FlushOnDropTube { 110 self.client_tube_channel 111 .recv() 112 .expect("client worker has done away") 113 } 114 115 /// Shuts down the control server, disconnecting any connected clients. shutdown(self) -> base::Result<()>116 pub fn shutdown(self) -> base::Result<()> { 117 let (listen_res, client_worker) = self.server_listener_worker.stop(); 118 match listen_res { 119 Err(e) if e.kind() == io::ErrorKind::Interrupted => (), 120 Err(e) => return Err(base::Error::from(e)), 121 Ok(()) => (), 122 }; 123 client_worker.shutdown() 124 } 125 126 /// Listen loop for the control server. Handles waiting for new connections, creates the 127 /// forwarding thread for control loop -> client data, and forwards client -> control loop 128 /// data. server_listener_loop( exit_evt: Event, client_worker: &mut ClientWorker, client_waiting: Event, client_tube_send_channel: mpsc::Sender<FlushOnDropTube>, mut client_pipe_read: PipeConnection, ) -> io::Result<()>129 fn server_listener_loop( 130 exit_evt: Event, 131 client_worker: &mut ClientWorker, 132 client_waiting: Event, 133 client_tube_send_channel: mpsc::Sender<FlushOnDropTube>, 134 mut client_pipe_read: PipeConnection, 135 ) -> io::Result<()> { 136 loop { 137 info!("control server: started, waiting for clients."); 138 client_pipe_read.wait_for_client_connection_overlapped_blocking(&exit_evt)?; 139 140 let mut read_overlapped = OverlappedWrapper::new(true)?; 141 let control_send = client_worker.connect_client(&client_tube_send_channel)?; 142 client_waiting.signal()?; 143 info!("control server: accepted client"); 144 145 loop { 146 let recv_result = base::deserialize_and_recv::<VmRequest, _>(|buf| { 147 client_pipe_read.read_overlapped_blocking( 148 buf, 149 &mut read_overlapped, 150 &exit_evt, 151 )?; 152 Ok(buf.len()) 153 }); 154 155 match recv_result { 156 Ok(msg) => { 157 control_send.send(&msg).map_err(|e| { 158 error!("unexpected error in control server recv loop: {}", e); 159 io::Error::new(io::ErrorKind::Other, e) 160 })?; 161 } 162 Err(TubeError::Disconnected) => break, 163 Err(e) => { 164 error!("unexpected error in control server recv loop: {}", e); 165 return Err(io::Error::new(io::ErrorKind::Other, e)); 166 } 167 }; 168 } 169 // Current client has disconnected. Now we can reuse the server pipe for a new client. 170 match client_pipe_read.disconnect_clients() { 171 Ok(()) => (), 172 // If the pipe is already broken/closed, we'll get an error about trying to close 173 // a pipe that has already been closed. Discard that error. 174 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => (), 175 Err(e) => return Err(e), 176 } 177 client_worker.stop_control_to_client_worker()?; 178 info!("control server: disconnected client"); 179 } 180 unreachable!("loop exits by returning an error"); 181 } 182 } 183 184 /// Handles connecting clients & forwarding data from client -> control server. 185 struct ClientWorker { 186 control_to_client_worker: Option<WorkerThread<(base::Result<()>, PipeConnection)>>, 187 client_pipe_write: Option<PipeConnection>, 188 } 189 190 impl ClientWorker { new(client_pipe_write: PipeConnection) -> Self191 fn new(client_pipe_write: PipeConnection) -> Self { 192 Self { 193 control_to_client_worker: None, 194 client_pipe_write: Some(client_pipe_write), 195 } 196 } 197 connect_client( &mut self, client_tube_send_channel: &mpsc::Sender<FlushOnDropTube>, ) -> base::Result<SendTube>198 fn connect_client( 199 &mut self, 200 client_tube_send_channel: &mpsc::Sender<FlushOnDropTube>, 201 ) -> base::Result<SendTube> { 202 // It is critical that the server end of the pipe is returned as the Tube in 203 // ControlServer::accept (tube_for_control_loop here). This way, we can ensure data is 204 // flushed before the pipe is dropped. In short, the order of Tubes returned by the pair 205 // matters. 206 let (tube_for_control_loop, tube_to_control_loop) = Tube::pair().map_err(|e| match e { 207 TubeError::Pair(io_err) => base::Error::from(io_err), 208 _ => base::Error::new(EIO), 209 })?; 210 211 let (control_send, control_recv) = 212 Tube::split_to_send_recv(tube_to_control_loop).map_err(|e| match e { 213 TubeError::Clone(io_err) => base::Error::from(io_err), 214 _ => base::Error::new(EIO), 215 })?; 216 217 let client_pipe_write = self.client_pipe_write.take().expect("loop already running"); 218 self.control_to_client_worker = Some(WorkerThread::start( 219 "ctrl_srv_client_to_ctrl", 220 move |exit_evt| { 221 let res = 222 Self::control_to_client_worker(exit_evt, &client_pipe_write, control_recv); 223 if let Err(e) = res.as_ref() { 224 error!("control_to_client_worker exited with error: {:?}", res); 225 } 226 (res, client_pipe_write) 227 }, 228 )); 229 client_tube_send_channel 230 .send(FlushOnDropTube::from(tube_for_control_loop)) 231 .expect("control server has gone away"); 232 Ok(control_send) 233 } 234 stop_control_to_client_worker(&mut self) -> base::Result<()>235 fn stop_control_to_client_worker(&mut self) -> base::Result<()> { 236 let (res, pipe) = self 237 .control_to_client_worker 238 .take() 239 .expect("loop must be running") 240 .stop(); 241 self.client_pipe_write = Some(pipe); 242 res 243 } 244 shutdown(self) -> base::Result<()>245 fn shutdown(self) -> base::Result<()> { 246 if let Some(worker) = self.control_to_client_worker { 247 worker.stop().0 248 } else { 249 Ok(()) 250 } 251 } 252 253 /// Worker that forwards data from the control loop -> client pipe. control_to_client_worker( exit_evt: Event, client_pipe_write: &PipeConnection, control_recv: RecvTube, ) -> base::Result<()>254 fn control_to_client_worker( 255 exit_evt: Event, 256 client_pipe_write: &PipeConnection, 257 control_recv: RecvTube, 258 ) -> base::Result<()> { 259 let wait_ctx = WaitContext::new()?; 260 wait_ctx.add(&exit_evt, Token::Exit)?; 261 wait_ctx.add(control_recv.get_read_notifier(), Token::Readable)?; 262 263 'poll: loop { 264 let events = wait_ctx.wait()?; 265 for event in events { 266 match event.token { 267 Token::Exit => { 268 break 'poll; 269 } 270 Token::Readable => { 271 let msg = match control_recv.recv::<VmResponse>() { 272 Ok(msg) => Ok(msg), 273 Err(TubeError::Disconnected) => { 274 return Ok(()); 275 } 276 Err(TubeError::Recv(e)) => Err(base::Error::from(e)), 277 Err(tube_error) => { 278 error!( 279 "unexpected error in control server recv loop: {}", 280 tube_error 281 ); 282 Err(base::Error::new(EIO)) 283 } 284 }?; 285 base::serialize_and_send(|buf| client_pipe_write.write(buf), &msg, None) 286 .map_err(|e| match e { 287 TubeError::Send(e) => base::Error::from(e), 288 tube_error => { 289 error!( 290 "unexpected error in control server recv loop: {}", 291 tube_error 292 ); 293 base::Error::new(EIO) 294 } 295 })?; 296 } 297 } 298 } 299 } 300 Ok(()) 301 } 302 } 303 304 #[cfg(test)] 305 mod tests { 306 use std::thread; 307 use std::time::Duration; 308 309 use base::PipeTube; 310 use rand::Rng; 311 312 use super::*; 313 generate_pipe_name() -> String314 fn generate_pipe_name() -> String { 315 format!( 316 r"\\.\pipe\test-ipc-pipe-name.rand{}", 317 rand::thread_rng().gen::<u64>(), 318 ) 319 } 320 321 #[track_caller] create_client(pipe_name: &str) -> PipeTube322 fn create_client(pipe_name: &str) -> PipeTube { 323 let mut last_error: Option<io::Error> = None; 324 for _ in 0..5 { 325 match named_pipes::create_client_pipe( 326 pipe_name, 327 &named_pipes::FramingMode::Message, 328 &named_pipes::BlockingMode::Wait, 329 /* overlapped= */ false, 330 ) { 331 Ok(pipe) => return PipeTube::from(pipe, None), 332 Err(e) => { 333 last_error = Some(e); 334 println!("failed client connection"); 335 thread::sleep(Duration::from_millis(100)) 336 } 337 } 338 } 339 panic!( 340 "failed to connect to control server: {:?}", 341 last_error.unwrap() 342 ) 343 } 344 345 #[test] test_smoke()346 fn test_smoke() { 347 // There are several threads, so run many iterations to exercise any possible race 348 // conditions. 349 for i in 0..100 { 350 println!("starting iteration {}", i); 351 let pipe_name = generate_pipe_name(); 352 353 let mut control_server = ControlServer::new(&pipe_name).unwrap(); 354 let fake_control_loop = base::thread::spawn_with_timeout(move || { 355 // First client. 356 { 357 println!("server: starting client 1"); 358 control_server.client_waiting().wait().unwrap(); 359 let client1 = control_server.accept(); 360 let req: VmRequest = client1.0.recv().unwrap(); 361 assert!(matches!(req, VmRequest::Powerbtn)); 362 client1.0.send(&VmResponse::Ok).unwrap(); 363 } 364 println!("server: finished client 1"); 365 366 // Second client. 367 { 368 println!("server: starting client 2"); 369 control_server.client_waiting().wait().unwrap(); 370 let client2 = control_server.accept(); 371 let req: VmRequest = client2.0.recv().unwrap(); 372 assert!(matches!(req, VmRequest::Exit)); 373 client2 374 .0 375 .send(&VmResponse::ErrString("err".to_owned())) 376 .unwrap(); 377 } 378 println!("server: finished client 2"); 379 control_server 380 }); 381 382 { 383 println!("client: starting client 1"); 384 let client1 = create_client(&pipe_name); 385 client1.send(&VmRequest::Powerbtn).unwrap(); 386 assert!(matches!(client1.recv().unwrap(), VmResponse::Ok)); 387 println!("client: finished client 1"); 388 } 389 390 { 391 println!("client: starting client 2"); 392 let client2 = create_client(&pipe_name); 393 client2.send(&VmRequest::Exit).unwrap(); 394 let resp = VmResponse::ErrString("err".to_owned()); 395 assert!(matches!(client2.recv::<VmResponse>().unwrap(), resp,)); 396 println!("client: finished client 2"); 397 } 398 399 let control_server = fake_control_loop.try_join(Duration::from_secs(2)).unwrap(); 400 control_server.shutdown().unwrap(); 401 println!("completed iteration {}", i); 402 } 403 } 404 } 405