xref: /aosp_15_r20/external/crosvm/devices/src/virtio/vhost/user/device/snd.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 pub mod sys;
6 
7 use std::borrow::Borrow;
8 use std::rc::Rc;
9 
10 use anyhow::anyhow;
11 use anyhow::bail;
12 use anyhow::Context;
13 use base::error;
14 use base::warn;
15 use cros_async::sync::RwLock as AsyncRwLock;
16 use cros_async::EventAsync;
17 use cros_async::Executor;
18 use futures::channel::mpsc;
19 use futures::FutureExt;
20 use hypervisor::ProtectionType;
21 use serde::Deserialize;
22 use serde::Serialize;
23 pub use sys::run_snd_device;
24 pub use sys::Options;
25 use vm_memory::GuestMemory;
26 use vmm_vhost::message::VhostUserProtocolFeatures;
27 use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
28 use zerocopy::AsBytes;
29 
30 use crate::virtio;
31 use crate::virtio::copy_config;
32 use crate::virtio::device_constants::snd::virtio_snd_config;
33 use crate::virtio::snd::common_backend::async_funcs::handle_ctrl_queue;
34 use crate::virtio::snd::common_backend::async_funcs::handle_pcm_queue;
35 use crate::virtio::snd::common_backend::async_funcs::send_pcm_response_worker;
36 use crate::virtio::snd::common_backend::create_stream_info_builders;
37 use crate::virtio::snd::common_backend::hardcoded_snd_data;
38 use crate::virtio::snd::common_backend::hardcoded_virtio_snd_config;
39 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
40 use crate::virtio::snd::common_backend::stream_info::StreamInfoBuilder;
41 use crate::virtio::snd::common_backend::stream_info::StreamInfoSnapshot;
42 use crate::virtio::snd::common_backend::Error;
43 use crate::virtio::snd::common_backend::PcmResponse;
44 use crate::virtio::snd::common_backend::SndData;
45 use crate::virtio::snd::common_backend::MAX_QUEUE_NUM;
46 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_PREPARE;
47 use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_START;
48 use crate::virtio::snd::parameters::Parameters;
49 use crate::virtio::vhost::user::device::handler::DeviceRequestHandler;
50 use crate::virtio::vhost::user::device::handler::Error as DeviceError;
51 use crate::virtio::vhost::user::device::handler::VhostUserDevice;
52 use crate::virtio::vhost::user::device::handler::WorkerState;
53 use crate::virtio::vhost::user::VhostUserDeviceBuilder;
54 use crate::virtio::Queue;
55 
56 // Async workers:
57 // 0 - ctrl
58 // 1 - event
59 // 2 - tx
60 // 3 - rx
61 const PCM_RESPONSE_WORKER_IDX_OFFSET: usize = 2;
62 struct SndBackend {
63     ex: Executor,
64     cfg: virtio_snd_config,
65     avail_features: u64,
66     workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; MAX_QUEUE_NUM],
67     // tx and rx
68     response_workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; 2],
69     snd_data: Rc<SndData>,
70     streams: Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
71     tx_send: mpsc::UnboundedSender<PcmResponse>,
72     rx_send: mpsc::UnboundedSender<PcmResponse>,
73     tx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,
74     rx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,
75     // Appended to logs for when there are mutliple audio devices.
76     card_index: usize,
77 }
78 
79 #[derive(Serialize, Deserialize)]
80 struct SndBackendSnapshot {
81     avail_features: u64,
82     stream_infos: Option<Vec<StreamInfoSnapshot>>,
83     snd_data: SndData,
84 }
85 
86 impl SndBackend {
new( ex: &Executor, params: Parameters, #[cfg(windows)] audio_client_guid: Option<String>, card_index: usize, ) -> anyhow::Result<Self>87     pub fn new(
88         ex: &Executor,
89         params: Parameters,
90         #[cfg(windows)] audio_client_guid: Option<String>,
91         card_index: usize,
92     ) -> anyhow::Result<Self> {
93         let cfg = hardcoded_virtio_snd_config(&params);
94         let avail_features = virtio::base_features(ProtectionType::Unprotected)
95             | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
96 
97         let snd_data = hardcoded_snd_data(&params);
98         let mut keep_rds = Vec::new();
99         let builders = create_stream_info_builders(&params, &snd_data, &mut keep_rds, card_index)?;
100 
101         if snd_data.pcm_info_len() != builders.len() {
102             error!(
103                 "[Card {}] snd: expected {} stream info builders, got {}",
104                 card_index,
105                 snd_data.pcm_info_len(),
106                 builders.len(),
107             )
108         }
109 
110         let streams = builders.into_iter();
111 
112         #[cfg(windows)]
113         let streams = streams
114             .map(|stream_builder| stream_builder.audio_client_guid(audio_client_guid.clone()));
115 
116         let streams = streams
117             .map(StreamInfoBuilder::build)
118             .map(AsyncRwLock::new)
119             .collect();
120         let streams = Rc::new(AsyncRwLock::new(streams));
121 
122         let (tx_send, tx_recv) = mpsc::unbounded();
123         let (rx_send, rx_recv) = mpsc::unbounded();
124 
125         Ok(SndBackend {
126             ex: ex.clone(),
127             cfg,
128             avail_features,
129             workers: Default::default(),
130             response_workers: Default::default(),
131             snd_data: Rc::new(snd_data),
132             streams,
133             tx_send,
134             rx_send,
135             tx_recv: Some(tx_recv),
136             rx_recv: Some(rx_recv),
137             card_index,
138         })
139     }
140 }
141 
142 impl VhostUserDeviceBuilder for SndBackend {
build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>>143     fn build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>> {
144         let handler = DeviceRequestHandler::new(*self);
145         Ok(Box::new(handler))
146     }
147 }
148 
149 impl VhostUserDevice for SndBackend {
max_queue_num(&self) -> usize150     fn max_queue_num(&self) -> usize {
151         MAX_QUEUE_NUM
152     }
153 
features(&self) -> u64154     fn features(&self) -> u64 {
155         self.avail_features
156     }
157 
protocol_features(&self) -> VhostUserProtocolFeatures158     fn protocol_features(&self) -> VhostUserProtocolFeatures {
159         VhostUserProtocolFeatures::CONFIG
160             | VhostUserProtocolFeatures::MQ
161             | VhostUserProtocolFeatures::DEVICE_STATE
162     }
163 
read_config(&self, offset: u64, data: &mut [u8])164     fn read_config(&self, offset: u64, data: &mut [u8]) {
165         copy_config(data, 0, self.cfg.as_bytes(), offset)
166     }
167 
reset(&mut self)168     fn reset(&mut self) {
169         for worker in self.workers.iter_mut().filter_map(Option::take) {
170             let _ = self.ex.run_until(worker.queue_task.cancel());
171         }
172     }
173 
start_queue( &mut self, idx: usize, queue: virtio::Queue, _mem: GuestMemory, ) -> anyhow::Result<()>174     fn start_queue(
175         &mut self,
176         idx: usize,
177         queue: virtio::Queue,
178         _mem: GuestMemory,
179     ) -> anyhow::Result<()> {
180         if self.workers[idx].is_some() {
181             warn!(
182                 "[Card {}] Starting new queue handler without stopping old handler",
183                 self.card_index
184             );
185             self.stop_queue(idx)?;
186         }
187 
188         let kick_evt = queue.event().try_clone().context(format!(
189             "[Card {}] failed to clone queue event",
190             self.card_index
191         ))?;
192         let mut kick_evt = EventAsync::new(kick_evt, &self.ex).context(format!(
193             "[Card {}] failed to create EventAsync for kick_evt",
194             self.card_index
195         ))?;
196         let queue = Rc::new(AsyncRwLock::new(queue));
197         let card_index = self.card_index;
198         let queue_task = match idx {
199             0 => {
200                 // ctrl queue
201                 let streams = self.streams.clone();
202                 let snd_data = self.snd_data.clone();
203                 let tx_send = self.tx_send.clone();
204                 let rx_send = self.rx_send.clone();
205                 let ctrl_queue = queue.clone();
206 
207                 let ex_clone = self.ex.clone();
208                 Some(self.ex.spawn_local(async move {
209                     handle_ctrl_queue(
210                         &ex_clone,
211                         &streams,
212                         &snd_data,
213                         ctrl_queue,
214                         &mut kick_evt,
215                         tx_send,
216                         rx_send,
217                         card_index,
218                         None,
219                     )
220                     .await
221                 }))
222             }
223             // TODO(woodychow): Add event queue support
224             //
225             // Note: Even though we don't support the event queue, we still need to keep track of
226             // the Queue so we can return it back in stop_queue. As such, we create a do nothing
227             // future to "run" this queue so that we track a WorkerState for it (which is how
228             // we return the Queue back).
229             1 => Some(self.ex.spawn_local(async move { Ok(()) })),
230             2 | 3 => {
231                 let (send, recv) = if idx == 2 {
232                     (self.tx_send.clone(), self.tx_recv.take())
233                 } else {
234                     (self.rx_send.clone(), self.rx_recv.take())
235                 };
236                 let mut recv = recv.ok_or_else(|| {
237                     anyhow!("[Card {}] queue restart is not supported", self.card_index)
238                 })?;
239                 let streams = Rc::clone(&self.streams);
240                 let queue_pcm_queue = queue.clone();
241                 let queue_task = self.ex.spawn_local(async move {
242                     handle_pcm_queue(&streams, send, queue_pcm_queue, &kick_evt, card_index, None)
243                         .await
244                 });
245 
246                 let queue_response_queue = queue.clone();
247                 let response_queue_task = self.ex.spawn_local(async move {
248                     send_pcm_response_worker(queue_response_queue, &mut recv, None).await
249                 });
250 
251                 self.response_workers[idx - PCM_RESPONSE_WORKER_IDX_OFFSET] = Some(WorkerState {
252                     queue_task: response_queue_task,
253                     queue: queue.clone(),
254                 });
255 
256                 Some(queue_task)
257             }
258             _ => bail!(
259                 "[Card {}] attempted to start unknown queue: {}",
260                 self.card_index,
261                 idx
262             ),
263         };
264 
265         if let Some(queue_task) = queue_task {
266             self.workers[idx] = Some(WorkerState { queue_task, queue });
267         }
268         Ok(())
269     }
270 
stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue>271     fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> {
272         let worker_queue_rc = self
273             .workers
274             .get_mut(idx)
275             .and_then(Option::take)
276             .map(|worker| {
277                 // Wait for queue_task to be aborted.
278                 let _ = self.ex.run_until(worker.queue_task.cancel());
279                 worker.queue
280             });
281 
282         if idx == 2 || idx == 3 {
283             if let Some(worker) = self
284                 .response_workers
285                 .get_mut(idx - PCM_RESPONSE_WORKER_IDX_OFFSET)
286                 .and_then(Option::take)
287             {
288                 // Wait for queue_task to be aborted.
289                 let _ = self.ex.run_until(worker.queue_task.cancel());
290             }
291         }
292 
293         if let Some(queue_rc) = worker_queue_rc {
294             match Rc::try_unwrap(queue_rc) {
295                 Ok(queue_mutex) => Ok(queue_mutex.into_inner()),
296                 Err(_) => panic!(
297                     "[Card {}] failed to recover queue from worker",
298                     self.card_index
299                 ),
300             }
301         } else {
302             Err(anyhow::Error::new(DeviceError::WorkerNotFound))
303         }
304     }
305 
snapshot(&mut self) -> anyhow::Result<serde_json::Value>306     fn snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
307         // now_or_never will succeed here because no workers are running.
308         let stream_info_snaps = if let Some(stream_infos) = &self.streams.lock().now_or_never() {
309             let mut snaps = Vec::new();
310             for stream_info in stream_infos.iter() {
311                 snaps.push(
312                     stream_info
313                         .lock()
314                         .now_or_never()
315                         .unwrap_or_else(|| {
316                             panic!(
317                                 "[Card {}] failed to lock audio state during snapshot",
318                                 self.card_index
319                             )
320                         })
321                         .snapshot(),
322                 );
323             }
324             Some(snaps)
325         } else {
326             None
327         };
328         let snd_data_ref: &SndData = self.snd_data.borrow();
329         serde_json::to_value(SndBackendSnapshot {
330             avail_features: self.avail_features,
331             stream_infos: stream_info_snaps,
332             snd_data: snd_data_ref.clone(),
333         })
334         .context(format!(
335             "[Card {}] Failed to serialize SndBackendSnapshot",
336             self.card_index
337         ))
338     }
339 
restore(&mut self, data: serde_json::Value) -> anyhow::Result<()>340     fn restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> {
341         let deser: SndBackendSnapshot = serde_json::from_value(data).context(format!(
342             "[Card {}] Failed to deserialize SndBackendSnapshot",
343             self.card_index
344         ))?;
345         anyhow::ensure!(
346             deser.avail_features == self.avail_features,
347             "[Card {}] avail features doesn't match on restore: expected: {}, got: {}",
348             self.card_index,
349             deser.avail_features,
350             self.avail_features
351         );
352         let snd_data = self.snd_data.borrow();
353         anyhow::ensure!(
354             &deser.snd_data == snd_data,
355             "[Card {}] snd data doesn't match on restore: expected: {:?}, got: {:?}",
356             self.card_index,
357             deser.snd_data,
358             snd_data,
359         );
360 
361         let ex_clone = self.ex.clone();
362         let streams_rc = self.streams.clone();
363         let tx_send_clone = self.tx_send.clone();
364         let rx_send_clone = self.rx_send.clone();
365 
366         let card_index = self.card_index;
367         let restore_task = self.ex.spawn_local(async move {
368             if let Some(stream_infos) = &deser.stream_infos {
369                 for (stream, stream_info) in streams_rc.lock().await.iter().zip(stream_infos.iter())
370                 {
371                     stream.lock().await.restore(stream_info);
372                     if stream_info.state == VIRTIO_SND_R_PCM_START
373                         || stream_info.state == VIRTIO_SND_R_PCM_PREPARE
374                     {
375                         stream
376                             .lock()
377                             .await
378                             .prepare(&ex_clone, &tx_send_clone, &rx_send_clone)
379                             .await
380                             .unwrap_or_else(|_| {
381                                 panic!("[Card {}] failed to prepare PCM", card_index)
382                             });
383                     }
384                     if stream_info.state == VIRTIO_SND_R_PCM_START {
385                         stream.lock().await.start().await.unwrap_or_else(|_| {
386                             panic!("[Card {}] failed to start PCM", card_index)
387                         });
388                     }
389                 }
390             }
391         });
392         self.ex
393             .run_until(restore_task)
394             .unwrap_or_else(|_| panic!("[Card {}] failed to restore streams", self.card_index));
395         Ok(())
396     }
397 
enter_suspended_state(&mut self) -> anyhow::Result<()>398     fn enter_suspended_state(&mut self) -> anyhow::Result<()> {
399         // This device has no non-queue workers to stop.
400         Ok(())
401     }
402 }
403