1 // Copyright 2021 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 pub mod sys; 6 7 use std::borrow::Borrow; 8 use std::rc::Rc; 9 10 use anyhow::anyhow; 11 use anyhow::bail; 12 use anyhow::Context; 13 use base::error; 14 use base::warn; 15 use cros_async::sync::RwLock as AsyncRwLock; 16 use cros_async::EventAsync; 17 use cros_async::Executor; 18 use futures::channel::mpsc; 19 use futures::FutureExt; 20 use hypervisor::ProtectionType; 21 use serde::Deserialize; 22 use serde::Serialize; 23 pub use sys::run_snd_device; 24 pub use sys::Options; 25 use vm_memory::GuestMemory; 26 use vmm_vhost::message::VhostUserProtocolFeatures; 27 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES; 28 use zerocopy::AsBytes; 29 30 use crate::virtio; 31 use crate::virtio::copy_config; 32 use crate::virtio::device_constants::snd::virtio_snd_config; 33 use crate::virtio::snd::common_backend::async_funcs::handle_ctrl_queue; 34 use crate::virtio::snd::common_backend::async_funcs::handle_pcm_queue; 35 use crate::virtio::snd::common_backend::async_funcs::send_pcm_response_worker; 36 use crate::virtio::snd::common_backend::create_stream_info_builders; 37 use crate::virtio::snd::common_backend::hardcoded_snd_data; 38 use crate::virtio::snd::common_backend::hardcoded_virtio_snd_config; 39 use crate::virtio::snd::common_backend::stream_info::StreamInfo; 40 use crate::virtio::snd::common_backend::stream_info::StreamInfoBuilder; 41 use crate::virtio::snd::common_backend::stream_info::StreamInfoSnapshot; 42 use crate::virtio::snd::common_backend::Error; 43 use crate::virtio::snd::common_backend::PcmResponse; 44 use crate::virtio::snd::common_backend::SndData; 45 use crate::virtio::snd::common_backend::MAX_QUEUE_NUM; 46 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_PREPARE; 47 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_START; 48 use crate::virtio::snd::parameters::Parameters; 49 use crate::virtio::vhost::user::device::handler::DeviceRequestHandler; 50 use crate::virtio::vhost::user::device::handler::Error as DeviceError; 51 use crate::virtio::vhost::user::device::handler::VhostUserDevice; 52 use crate::virtio::vhost::user::device::handler::WorkerState; 53 use crate::virtio::vhost::user::VhostUserDeviceBuilder; 54 use crate::virtio::Queue; 55 56 // Async workers: 57 // 0 - ctrl 58 // 1 - event 59 // 2 - tx 60 // 3 - rx 61 const PCM_RESPONSE_WORKER_IDX_OFFSET: usize = 2; 62 struct SndBackend { 63 ex: Executor, 64 cfg: virtio_snd_config, 65 avail_features: u64, 66 workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; MAX_QUEUE_NUM], 67 // tx and rx 68 response_workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; 2], 69 snd_data: Rc<SndData>, 70 streams: Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, 71 tx_send: mpsc::UnboundedSender<PcmResponse>, 72 rx_send: mpsc::UnboundedSender<PcmResponse>, 73 tx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>, 74 rx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>, 75 // Appended to logs for when there are mutliple audio devices. 76 card_index: usize, 77 } 78 79 #[derive(Serialize, Deserialize)] 80 struct SndBackendSnapshot { 81 avail_features: u64, 82 stream_infos: Option<Vec<StreamInfoSnapshot>>, 83 snd_data: SndData, 84 } 85 86 impl SndBackend { new( ex: &Executor, params: Parameters, #[cfg(windows)] audio_client_guid: Option<String>, card_index: usize, ) -> anyhow::Result<Self>87 pub fn new( 88 ex: &Executor, 89 params: Parameters, 90 #[cfg(windows)] audio_client_guid: Option<String>, 91 card_index: usize, 92 ) -> anyhow::Result<Self> { 93 let cfg = hardcoded_virtio_snd_config(¶ms); 94 let avail_features = virtio::base_features(ProtectionType::Unprotected) 95 | 1 << VHOST_USER_F_PROTOCOL_FEATURES; 96 97 let snd_data = hardcoded_snd_data(¶ms); 98 let mut keep_rds = Vec::new(); 99 let builders = create_stream_info_builders(¶ms, &snd_data, &mut keep_rds, card_index)?; 100 101 if snd_data.pcm_info_len() != builders.len() { 102 error!( 103 "[Card {}] snd: expected {} stream info builders, got {}", 104 card_index, 105 snd_data.pcm_info_len(), 106 builders.len(), 107 ) 108 } 109 110 let streams = builders.into_iter(); 111 112 #[cfg(windows)] 113 let streams = streams 114 .map(|stream_builder| stream_builder.audio_client_guid(audio_client_guid.clone())); 115 116 let streams = streams 117 .map(StreamInfoBuilder::build) 118 .map(AsyncRwLock::new) 119 .collect(); 120 let streams = Rc::new(AsyncRwLock::new(streams)); 121 122 let (tx_send, tx_recv) = mpsc::unbounded(); 123 let (rx_send, rx_recv) = mpsc::unbounded(); 124 125 Ok(SndBackend { 126 ex: ex.clone(), 127 cfg, 128 avail_features, 129 workers: Default::default(), 130 response_workers: Default::default(), 131 snd_data: Rc::new(snd_data), 132 streams, 133 tx_send, 134 rx_send, 135 tx_recv: Some(tx_recv), 136 rx_recv: Some(rx_recv), 137 card_index, 138 }) 139 } 140 } 141 142 impl VhostUserDeviceBuilder for SndBackend { build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>>143 fn build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>> { 144 let handler = DeviceRequestHandler::new(*self); 145 Ok(Box::new(handler)) 146 } 147 } 148 149 impl VhostUserDevice for SndBackend { max_queue_num(&self) -> usize150 fn max_queue_num(&self) -> usize { 151 MAX_QUEUE_NUM 152 } 153 features(&self) -> u64154 fn features(&self) -> u64 { 155 self.avail_features 156 } 157 protocol_features(&self) -> VhostUserProtocolFeatures158 fn protocol_features(&self) -> VhostUserProtocolFeatures { 159 VhostUserProtocolFeatures::CONFIG 160 | VhostUserProtocolFeatures::MQ 161 | VhostUserProtocolFeatures::DEVICE_STATE 162 } 163 read_config(&self, offset: u64, data: &mut [u8])164 fn read_config(&self, offset: u64, data: &mut [u8]) { 165 copy_config(data, 0, self.cfg.as_bytes(), offset) 166 } 167 reset(&mut self)168 fn reset(&mut self) { 169 for worker in self.workers.iter_mut().filter_map(Option::take) { 170 let _ = self.ex.run_until(worker.queue_task.cancel()); 171 } 172 } 173 start_queue( &mut self, idx: usize, queue: virtio::Queue, _mem: GuestMemory, ) -> anyhow::Result<()>174 fn start_queue( 175 &mut self, 176 idx: usize, 177 queue: virtio::Queue, 178 _mem: GuestMemory, 179 ) -> anyhow::Result<()> { 180 if self.workers[idx].is_some() { 181 warn!( 182 "[Card {}] Starting new queue handler without stopping old handler", 183 self.card_index 184 ); 185 self.stop_queue(idx)?; 186 } 187 188 let kick_evt = queue.event().try_clone().context(format!( 189 "[Card {}] failed to clone queue event", 190 self.card_index 191 ))?; 192 let mut kick_evt = EventAsync::new(kick_evt, &self.ex).context(format!( 193 "[Card {}] failed to create EventAsync for kick_evt", 194 self.card_index 195 ))?; 196 let queue = Rc::new(AsyncRwLock::new(queue)); 197 let card_index = self.card_index; 198 let queue_task = match idx { 199 0 => { 200 // ctrl queue 201 let streams = self.streams.clone(); 202 let snd_data = self.snd_data.clone(); 203 let tx_send = self.tx_send.clone(); 204 let rx_send = self.rx_send.clone(); 205 let ctrl_queue = queue.clone(); 206 207 let ex_clone = self.ex.clone(); 208 Some(self.ex.spawn_local(async move { 209 handle_ctrl_queue( 210 &ex_clone, 211 &streams, 212 &snd_data, 213 ctrl_queue, 214 &mut kick_evt, 215 tx_send, 216 rx_send, 217 card_index, 218 None, 219 ) 220 .await 221 })) 222 } 223 // TODO(woodychow): Add event queue support 224 // 225 // Note: Even though we don't support the event queue, we still need to keep track of 226 // the Queue so we can return it back in stop_queue. As such, we create a do nothing 227 // future to "run" this queue so that we track a WorkerState for it (which is how 228 // we return the Queue back). 229 1 => Some(self.ex.spawn_local(async move { Ok(()) })), 230 2 | 3 => { 231 let (send, recv) = if idx == 2 { 232 (self.tx_send.clone(), self.tx_recv.take()) 233 } else { 234 (self.rx_send.clone(), self.rx_recv.take()) 235 }; 236 let mut recv = recv.ok_or_else(|| { 237 anyhow!("[Card {}] queue restart is not supported", self.card_index) 238 })?; 239 let streams = Rc::clone(&self.streams); 240 let queue_pcm_queue = queue.clone(); 241 let queue_task = self.ex.spawn_local(async move { 242 handle_pcm_queue(&streams, send, queue_pcm_queue, &kick_evt, card_index, None) 243 .await 244 }); 245 246 let queue_response_queue = queue.clone(); 247 let response_queue_task = self.ex.spawn_local(async move { 248 send_pcm_response_worker(queue_response_queue, &mut recv, None).await 249 }); 250 251 self.response_workers[idx - PCM_RESPONSE_WORKER_IDX_OFFSET] = Some(WorkerState { 252 queue_task: response_queue_task, 253 queue: queue.clone(), 254 }); 255 256 Some(queue_task) 257 } 258 _ => bail!( 259 "[Card {}] attempted to start unknown queue: {}", 260 self.card_index, 261 idx 262 ), 263 }; 264 265 if let Some(queue_task) = queue_task { 266 self.workers[idx] = Some(WorkerState { queue_task, queue }); 267 } 268 Ok(()) 269 } 270 stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue>271 fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> { 272 let worker_queue_rc = self 273 .workers 274 .get_mut(idx) 275 .and_then(Option::take) 276 .map(|worker| { 277 // Wait for queue_task to be aborted. 278 let _ = self.ex.run_until(worker.queue_task.cancel()); 279 worker.queue 280 }); 281 282 if idx == 2 || idx == 3 { 283 if let Some(worker) = self 284 .response_workers 285 .get_mut(idx - PCM_RESPONSE_WORKER_IDX_OFFSET) 286 .and_then(Option::take) 287 { 288 // Wait for queue_task to be aborted. 289 let _ = self.ex.run_until(worker.queue_task.cancel()); 290 } 291 } 292 293 if let Some(queue_rc) = worker_queue_rc { 294 match Rc::try_unwrap(queue_rc) { 295 Ok(queue_mutex) => Ok(queue_mutex.into_inner()), 296 Err(_) => panic!( 297 "[Card {}] failed to recover queue from worker", 298 self.card_index 299 ), 300 } 301 } else { 302 Err(anyhow::Error::new(DeviceError::WorkerNotFound)) 303 } 304 } 305 snapshot(&mut self) -> anyhow::Result<serde_json::Value>306 fn snapshot(&mut self) -> anyhow::Result<serde_json::Value> { 307 // now_or_never will succeed here because no workers are running. 308 let stream_info_snaps = if let Some(stream_infos) = &self.streams.lock().now_or_never() { 309 let mut snaps = Vec::new(); 310 for stream_info in stream_infos.iter() { 311 snaps.push( 312 stream_info 313 .lock() 314 .now_or_never() 315 .unwrap_or_else(|| { 316 panic!( 317 "[Card {}] failed to lock audio state during snapshot", 318 self.card_index 319 ) 320 }) 321 .snapshot(), 322 ); 323 } 324 Some(snaps) 325 } else { 326 None 327 }; 328 let snd_data_ref: &SndData = self.snd_data.borrow(); 329 serde_json::to_value(SndBackendSnapshot { 330 avail_features: self.avail_features, 331 stream_infos: stream_info_snaps, 332 snd_data: snd_data_ref.clone(), 333 }) 334 .context(format!( 335 "[Card {}] Failed to serialize SndBackendSnapshot", 336 self.card_index 337 )) 338 } 339 restore(&mut self, data: serde_json::Value) -> anyhow::Result<()>340 fn restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> { 341 let deser: SndBackendSnapshot = serde_json::from_value(data).context(format!( 342 "[Card {}] Failed to deserialize SndBackendSnapshot", 343 self.card_index 344 ))?; 345 anyhow::ensure!( 346 deser.avail_features == self.avail_features, 347 "[Card {}] avail features doesn't match on restore: expected: {}, got: {}", 348 self.card_index, 349 deser.avail_features, 350 self.avail_features 351 ); 352 let snd_data = self.snd_data.borrow(); 353 anyhow::ensure!( 354 &deser.snd_data == snd_data, 355 "[Card {}] snd data doesn't match on restore: expected: {:?}, got: {:?}", 356 self.card_index, 357 deser.snd_data, 358 snd_data, 359 ); 360 361 let ex_clone = self.ex.clone(); 362 let streams_rc = self.streams.clone(); 363 let tx_send_clone = self.tx_send.clone(); 364 let rx_send_clone = self.rx_send.clone(); 365 366 let card_index = self.card_index; 367 let restore_task = self.ex.spawn_local(async move { 368 if let Some(stream_infos) = &deser.stream_infos { 369 for (stream, stream_info) in streams_rc.lock().await.iter().zip(stream_infos.iter()) 370 { 371 stream.lock().await.restore(stream_info); 372 if stream_info.state == VIRTIO_SND_R_PCM_START 373 || stream_info.state == VIRTIO_SND_R_PCM_PREPARE 374 { 375 stream 376 .lock() 377 .await 378 .prepare(&ex_clone, &tx_send_clone, &rx_send_clone) 379 .await 380 .unwrap_or_else(|_| { 381 panic!("[Card {}] failed to prepare PCM", card_index) 382 }); 383 } 384 if stream_info.state == VIRTIO_SND_R_PCM_START { 385 stream.lock().await.start().await.unwrap_or_else(|_| { 386 panic!("[Card {}] failed to start PCM", card_index) 387 }); 388 } 389 } 390 } 391 }); 392 self.ex 393 .run_until(restore_task) 394 .unwrap_or_else(|_| panic!("[Card {}] failed to restore streams", self.card_index)); 395 Ok(()) 396 } 397 enter_suspended_state(&mut self) -> anyhow::Result<()>398 fn enter_suspended_state(&mut self) -> anyhow::Result<()> { 399 // This device has no non-queue workers to stop. 400 Ok(()) 401 } 402 } 403