xref: /aosp_15_r20/external/crosvm/devices/src/virtio/snd/vios_backend/worker.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io::Read;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9 
10 use base::error;
11 use base::warn;
12 use base::Event;
13 use base::EventToken;
14 use base::WaitContext;
15 use data_model::Le32;
16 use sync::Mutex;
17 use zerocopy::AsBytes;
18 
19 use super::super::constants::*;
20 use super::super::layout::*;
21 use super::streams::*;
22 use super::Result;
23 use super::SoundError;
24 use super::*;
25 use crate::virtio::DescriptorChain;
26 use crate::virtio::Interrupt;
27 use crate::virtio::Queue;
28 
29 pub struct Worker {
30     // Lock order: Must never hold more than one queue lock at the same time.
31     interrupt: Interrupt,
32     pub control_queue: Arc<Mutex<Queue>>,
33     pub event_queue: Option<Queue>,
34     vios_client: Arc<Mutex<VioSClient>>,
35     streams: Vec<StreamProxy>,
36     pub tx_queue: Arc<Mutex<Queue>>,
37     pub rx_queue: Arc<Mutex<Queue>>,
38     io_thread: Option<thread::JoinHandle<Result<()>>>,
39     io_kill: Event,
40     // saved_stream_state holds the previous state of streams. When the sound device is newly
41     // created, this will be empty. It will only contain state if the sound device is put to sleep
42     // OR if we restore a VM.
43     pub saved_stream_state: Vec<StreamSnapshot>,
44 }
45 
46 impl Worker {
47     /// Creates a new virtio-snd worker.
try_new( vios_client: Arc<Mutex<VioSClient>>, interrupt: Interrupt, control_queue: Arc<Mutex<Queue>>, event_queue: Queue, tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, saved_stream_state: Vec<StreamSnapshot>, ) -> Result<Worker>48     pub fn try_new(
49         vios_client: Arc<Mutex<VioSClient>>,
50         interrupt: Interrupt,
51         control_queue: Arc<Mutex<Queue>>,
52         event_queue: Queue,
53         tx_queue: Arc<Mutex<Queue>>,
54         rx_queue: Arc<Mutex<Queue>>,
55         saved_stream_state: Vec<StreamSnapshot>,
56     ) -> Result<Worker> {
57         let num_streams = vios_client.lock().num_streams();
58         let mut streams: Vec<StreamProxy> = Vec::with_capacity(num_streams as usize);
59         {
60             for stream_id in 0..num_streams {
61                 let capture = vios_client
62                     .lock()
63                     .stream_info(stream_id)
64                     .map(|i| i.direction == VIRTIO_SND_D_INPUT)
65                     .unwrap_or(false);
66                 let io_queue = if capture { &rx_queue } else { &tx_queue };
67                 streams.push(Stream::try_new(
68                     stream_id,
69                     vios_client.clone(),
70                     control_queue.clone(),
71                     io_queue.clone(),
72                     capture,
73                     saved_stream_state.get(stream_id as usize).cloned(),
74                 )?);
75             }
76         }
77         let (self_kill_io, kill_io) = Event::new()
78             .and_then(|e| Ok((e.try_clone()?, e)))
79             .map_err(SoundError::CreateEvent)?;
80 
81         let senders: Vec<Sender<Box<StreamMsg>>> =
82             streams.iter().map(|sp| sp.msg_sender().clone()).collect();
83         let tx_queue_thread = tx_queue.clone();
84         let rx_queue_thread = rx_queue.clone();
85         let io_thread = thread::Builder::new()
86             .name("v_snd_io".to_string())
87             .spawn(move || {
88                 try_set_real_time_priority();
89 
90                 io_loop(tx_queue_thread, rx_queue_thread, senders, kill_io)
91             })
92             .map_err(SoundError::CreateThread)?;
93         Ok(Worker {
94             interrupt,
95             control_queue,
96             event_queue: Some(event_queue),
97             vios_client,
98             streams,
99             tx_queue,
100             rx_queue,
101             io_thread: Some(io_thread),
102             io_kill: self_kill_io,
103             saved_stream_state: Vec::new(),
104         })
105     }
106 
107     /// Emulates the virtio-snd device. It won't return until something is written to the kill_evt
108     /// event or an unrecoverable error occurs.
control_loop(&mut self, kill_evt: Event) -> Result<()>109     pub fn control_loop(&mut self, kill_evt: Event) -> Result<()> {
110         let event_notifier = self
111             .vios_client
112             .lock()
113             .get_event_notifier()
114             .map_err(SoundError::ClientEventNotifier)?;
115         #[derive(EventToken)]
116         enum Token {
117             ControlQAvailable,
118             EventQAvailable,
119             InterruptResample,
120             EventTriggered,
121             Kill,
122         }
123         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
124             (self.control_queue.lock().event(), Token::ControlQAvailable),
125             (
126                 self.event_queue.as_ref().expect("queue missing").event(),
127                 Token::EventQAvailable,
128             ),
129             (&event_notifier, Token::EventTriggered),
130             (&kill_evt, Token::Kill),
131         ])
132         .map_err(SoundError::WaitCtx)?;
133 
134         if let Some(resample_evt) = self.interrupt.get_resample_evt() {
135             wait_ctx
136                 .add(resample_evt, Token::InterruptResample)
137                 .map_err(SoundError::WaitCtx)?;
138         }
139         let mut event_queue = self.event_queue.take().expect("event_queue missing");
140         'wait: loop {
141             let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
142 
143             for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
144                 match wait_evt.token {
145                     Token::ControlQAvailable => {
146                         self.control_queue
147                             .lock()
148                             .event()
149                             .wait()
150                             .map_err(SoundError::QueueEvt)?;
151                         self.process_controlq_buffers()?;
152                     }
153                     Token::EventQAvailable => {
154                         // Just read from the event object to make sure the producer of such events
155                         // never blocks. The buffers will only be used when actual virtio-snd
156                         // events are triggered.
157                         event_queue.event().wait().map_err(SoundError::QueueEvt)?;
158                     }
159                     Token::EventTriggered => {
160                         event_notifier.wait().map_err(SoundError::QueueEvt)?;
161                         self.process_event_triggered(&mut event_queue)?;
162                     }
163                     Token::InterruptResample => {
164                         self.interrupt.interrupt_resample();
165                     }
166                     Token::Kill => {
167                         let _ = kill_evt.wait();
168                         break 'wait;
169                     }
170                 }
171             }
172         }
173         self.saved_stream_state = self
174             .streams
175             .drain(..)
176             .map(|stream| stream.stop_thread())
177             .collect();
178         self.event_queue = Some(event_queue);
179         Ok(())
180     }
181 
stop_io_thread(&mut self)182     fn stop_io_thread(&mut self) {
183         if let Err(e) = self.io_kill.signal() {
184             error!(
185                 "virtio-snd: Failed to send Break msg to stream thread: {}",
186                 e
187             );
188         }
189         if let Some(th) = self.io_thread.take() {
190             match th.join() {
191                 Err(e) => {
192                     error!("virtio-snd: Panic detected on stream thread: {:?}", e);
193                 }
194                 Ok(r) => {
195                     if let Err(e) = r {
196                         error!("virtio-snd: IO thread exited with and error: {}", e);
197                     }
198                 }
199             }
200         }
201     }
202 
203     // Pops and handles all available ontrol queue buffers. Logs minor errors, but returns an
204     // Err if it encounters an unrecoverable error.
process_controlq_buffers(&mut self) -> Result<()>205     fn process_controlq_buffers(&mut self) -> Result<()> {
206         while let Some(mut avail_desc) = lock_pop_unlock(&self.control_queue) {
207             let reader = &mut avail_desc.reader;
208             let available_bytes = reader.available_bytes();
209             if available_bytes < std::mem::size_of::<virtio_snd_hdr>() {
210                 error!(
211                     "virtio-snd: Message received on control queue is too small: {}",
212                     available_bytes
213                 );
214                 return reply_control_op_status(
215                     VIRTIO_SND_S_BAD_MSG,
216                     avail_desc,
217                     &self.control_queue,
218                 );
219             }
220             let mut read_buf = vec![0u8; available_bytes];
221             reader
222                 .read_exact(&mut read_buf)
223                 .map_err(SoundError::QueueIO)?;
224             let mut code: Le32 = Default::default();
225             // need to copy because the buffer may not be properly aligned
226             code.as_bytes_mut()
227                 .copy_from_slice(&read_buf[..std::mem::size_of::<Le32>()]);
228             let request_type = code.to_native();
229             match request_type {
230                 VIRTIO_SND_R_JACK_INFO => {
231                     let (code, info_vec) = {
232                         match self.parse_info_query(&read_buf) {
233                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
234                             Some((start_id, count)) => {
235                                 let end_id = start_id.saturating_add(count);
236                                 if end_id > self.vios_client.lock().num_jacks() {
237                                     error!(
238                                         "virtio-snd: Requested info on invalid jacks ids: {}..{}",
239                                         start_id,
240                                         end_id - 1
241                                     );
242                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
243                                 } else {
244                                     (
245                                         VIRTIO_SND_S_OK,
246                                         // Safe to unwrap because we just ensured all the ids are
247                                         // valid
248                                         (start_id..end_id)
249                                             .map(|id| {
250                                                 self.vios_client.lock().jack_info(id).unwrap()
251                                             })
252                                             .collect(),
253                                     )
254                                 }
255                             }
256                         }
257                     };
258                     self.send_info_reply(avail_desc, code, info_vec)?;
259                 }
260                 VIRTIO_SND_R_JACK_REMAP => {
261                     let code = if read_buf.len() != std::mem::size_of::<virtio_snd_jack_remap>() {
262                         error!(
263                         "virtio-snd: The driver sent the wrong number bytes for a jack_remap struct: {}",
264                         read_buf.len()
265                         );
266                         VIRTIO_SND_S_BAD_MSG
267                     } else {
268                         let mut request: virtio_snd_jack_remap = Default::default();
269                         request.as_bytes_mut().copy_from_slice(&read_buf);
270                         let jack_id = request.hdr.jack_id.to_native();
271                         let association = request.association.to_native();
272                         let sequence = request.sequence.to_native();
273                         if let Err(e) =
274                             self.vios_client
275                                 .lock()
276                                 .remap_jack(jack_id, association, sequence)
277                         {
278                             error!("virtio-snd: Failed to remap jack: {}", e);
279                             vios_error_to_status_code(e)
280                         } else {
281                             VIRTIO_SND_S_OK
282                         }
283                     };
284                     let writer = &mut avail_desc.writer;
285                     writer
286                         .write_obj(virtio_snd_hdr {
287                             code: Le32::from(code),
288                         })
289                         .map_err(SoundError::QueueIO)?;
290                     let len = writer.bytes_written() as u32;
291                     {
292                         let mut queue_lock = self.control_queue.lock();
293                         queue_lock.add_used(avail_desc, len);
294                         queue_lock.trigger_interrupt();
295                     }
296                 }
297                 VIRTIO_SND_R_CHMAP_INFO => {
298                     let (code, info_vec) = {
299                         match self.parse_info_query(&read_buf) {
300                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
301                             Some((start_id, count)) => {
302                                 let end_id = start_id.saturating_add(count);
303                                 let num_chmaps = self.vios_client.lock().num_chmaps();
304                                 if end_id > num_chmaps {
305                                     error!(
306                                         "virtio-snd: Requested info on invalid chmaps ids: {}..{}",
307                                         start_id,
308                                         end_id - 1
309                                     );
310                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
311                                 } else {
312                                     (
313                                         VIRTIO_SND_S_OK,
314                                         // Safe to unwrap because we just ensured all the ids are
315                                         // valid
316                                         (start_id..end_id)
317                                             .map(|id| {
318                                                 self.vios_client.lock().chmap_info(id).unwrap()
319                                             })
320                                             .collect(),
321                                     )
322                                 }
323                             }
324                         }
325                     };
326                     self.send_info_reply(avail_desc, code, info_vec)?;
327                 }
328                 VIRTIO_SND_R_PCM_INFO => {
329                     let (code, info_vec) = {
330                         match self.parse_info_query(&read_buf) {
331                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
332                             Some((start_id, count)) => {
333                                 let end_id = start_id.saturating_add(count);
334                                 if end_id > self.vios_client.lock().num_streams() {
335                                     error!(
336                                         "virtio-snd: Requested info on invalid stream ids: {}..{}",
337                                         start_id,
338                                         end_id - 1
339                                     );
340                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
341                                 } else {
342                                     (
343                                         VIRTIO_SND_S_OK,
344                                         // Safe to unwrap because we just ensured all the ids are
345                                         // valid
346                                         (start_id..end_id)
347                                             .map(|id| {
348                                                 self.vios_client.lock().stream_info(id).unwrap()
349                                             })
350                                             .collect(),
351                                     )
352                                 }
353                             }
354                         }
355                     };
356                     self.send_info_reply(avail_desc, code, info_vec)?;
357                 }
358                 VIRTIO_SND_R_PCM_SET_PARAMS => self.process_set_params(avail_desc, &read_buf)?,
359                 VIRTIO_SND_R_PCM_PREPARE => {
360                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Prepare(avail_desc))?
361                 }
362                 VIRTIO_SND_R_PCM_RELEASE => {
363                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Release(avail_desc))?
364                 }
365                 VIRTIO_SND_R_PCM_START => {
366                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Start(avail_desc))?
367                 }
368                 VIRTIO_SND_R_PCM_STOP => {
369                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Stop(avail_desc))?
370                 }
371                 _ => {
372                     error!(
373                         "virtio-snd: Unknown control queue mesage code: {}",
374                         request_type
375                     );
376                     reply_control_op_status(
377                         VIRTIO_SND_S_NOT_SUPP,
378                         avail_desc,
379                         &self.control_queue,
380                     )?;
381                 }
382             }
383         }
384         Ok(())
385     }
386 
process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()>387     fn process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()> {
388         while let Some(evt) = self.vios_client.lock().pop_event() {
389             if let Some(mut desc) = event_queue.pop() {
390                 let writer = &mut desc.writer;
391                 writer.write_obj(evt).map_err(SoundError::QueueIO)?;
392                 let len = writer.bytes_written() as u32;
393                 event_queue.add_used(desc, len);
394                 event_queue.trigger_interrupt();
395             } else {
396                 warn!("virtio-snd: Dropping event because there are no buffers in virtqueue");
397             }
398         }
399         Ok(())
400     }
401 
parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)>402     fn parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)> {
403         if read_buf.len() != std::mem::size_of::<virtio_snd_query_info>() {
404             error!(
405                 "virtio-snd: The driver sent the wrong number bytes for a pcm_info struct: {}",
406                 read_buf.len()
407             );
408             return None;
409         }
410         let mut query: virtio_snd_query_info = Default::default();
411         query.as_bytes_mut().copy_from_slice(read_buf);
412         let start_id = query.start_id.to_native();
413         let count = query.count.to_native();
414         Some((start_id, count))
415     }
416 
417     // Returns Err if it encounters an unrecoverable error, Ok otherwise
process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()>418     fn process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()> {
419         if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_set_params>() {
420             error!(
421                 "virtio-snd: The driver sent a buffer of the wrong size for a set_params struct: {}",
422                 read_buf.len()
423                 );
424             return reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue);
425         }
426         let mut params: virtio_snd_pcm_set_params = Default::default();
427         params.as_bytes_mut().copy_from_slice(read_buf);
428         let stream_id = params.hdr.stream_id.to_native();
429         if stream_id < self.vios_client.lock().num_streams() {
430             self.streams[stream_id as usize].send(StreamMsg::SetParams(desc, params))
431         } else {
432             error!(
433                 "virtio-snd: Driver requested operation on invalid stream: {}",
434                 stream_id
435             );
436             reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue)
437         }
438     }
439 
440     // Returns Err if it encounters an unrecoverable error, Ok otherwise
try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()>441     fn try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()> {
442         if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_hdr>() {
443             error!(
444                 "virtio-snd: The driver sent a buffer too small to contain a header: {}",
445                 read_buf.len()
446             );
447             return reply_control_op_status(
448                 VIRTIO_SND_S_BAD_MSG,
449                 match msg {
450                     StreamMsg::Prepare(d)
451                     | StreamMsg::Start(d)
452                     | StreamMsg::Stop(d)
453                     | StreamMsg::Release(d) => d,
454                     _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
455                 },
456                 &self.control_queue,
457             );
458         }
459         let mut pcm_hdr: virtio_snd_pcm_hdr = Default::default();
460         pcm_hdr.as_bytes_mut().copy_from_slice(read_buf);
461         let stream_id = pcm_hdr.stream_id.to_native();
462         if stream_id < self.vios_client.lock().num_streams() {
463             self.streams[stream_id as usize].send(msg)
464         } else {
465             error!(
466                 "virtio-snd: Driver requested operation on invalid stream: {}",
467                 stream_id
468             );
469             reply_control_op_status(
470                 VIRTIO_SND_S_BAD_MSG,
471                 match msg {
472                     StreamMsg::Prepare(d)
473                     | StreamMsg::Start(d)
474                     | StreamMsg::Stop(d)
475                     | StreamMsg::Release(d) => d,
476                     _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
477                 },
478                 &self.control_queue,
479             )
480         }
481     }
482 
send_info_reply<T: AsBytes>( &mut self, mut desc: DescriptorChain, code: u32, info_vec: Vec<T>, ) -> Result<()>483     fn send_info_reply<T: AsBytes>(
484         &mut self,
485         mut desc: DescriptorChain,
486         code: u32,
487         info_vec: Vec<T>,
488     ) -> Result<()> {
489         let writer = &mut desc.writer;
490         writer
491             .write_obj(virtio_snd_hdr {
492                 code: Le32::from(code),
493             })
494             .map_err(SoundError::QueueIO)?;
495         for info in info_vec {
496             writer.write_obj(info).map_err(SoundError::QueueIO)?;
497         }
498         let len = writer.bytes_written() as u32;
499         {
500             let mut queue_lock = self.control_queue.lock();
501             queue_lock.add_used(desc, len);
502             queue_lock.trigger_interrupt();
503         }
504         Ok(())
505     }
506 }
507 
508 impl Drop for Worker {
drop(&mut self)509     fn drop(&mut self) {
510         self.stop_io_thread();
511     }
512 }
513 
io_loop( tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, senders: Vec<Sender<Box<StreamMsg>>>, kill_evt: Event, ) -> Result<()>514 fn io_loop(
515     tx_queue: Arc<Mutex<Queue>>,
516     rx_queue: Arc<Mutex<Queue>>,
517     senders: Vec<Sender<Box<StreamMsg>>>,
518     kill_evt: Event,
519 ) -> Result<()> {
520     #[derive(EventToken)]
521     enum Token {
522         TxQAvailable,
523         RxQAvailable,
524         Kill,
525     }
526     let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
527         (tx_queue.lock().event(), Token::TxQAvailable),
528         (rx_queue.lock().event(), Token::RxQAvailable),
529         (&kill_evt, Token::Kill),
530     ])
531     .map_err(SoundError::WaitCtx)?;
532 
533     'wait: loop {
534         let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
535         for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
536             let queue = match wait_evt.token {
537                 Token::TxQAvailable => {
538                     tx_queue
539                         .lock()
540                         .event()
541                         .wait()
542                         .map_err(SoundError::QueueEvt)?;
543                     &tx_queue
544                 }
545                 Token::RxQAvailable => {
546                     rx_queue
547                         .lock()
548                         .event()
549                         .wait()
550                         .map_err(SoundError::QueueEvt)?;
551                     &rx_queue
552                 }
553                 Token::Kill => {
554                     let _ = kill_evt.wait();
555                     break 'wait;
556                 }
557             };
558             while let Some(mut avail_desc) = lock_pop_unlock(queue) {
559                 let reader = &mut avail_desc.reader;
560                 let xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(SoundError::QueueIO)?;
561                 let stream_id = xfer.stream_id.to_native();
562                 if stream_id as usize >= senders.len() {
563                     error!(
564                         "virtio-snd: Driver sent buffer for invalid stream: {}",
565                         stream_id
566                     );
567                     reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, avail_desc, queue)?;
568                 } else {
569                     StreamProxy::send_msg(
570                         &senders[stream_id as usize],
571                         StreamMsg::Buffer(avail_desc),
572                     )?;
573                 }
574             }
575         }
576     }
577     Ok(())
578 }
579 
580 // If queue.lock().pop() is used directly in the condition of a 'while' loop the lock is held over
581 // the entire loop block. Encapsulating it in this fuction guarantees that the lock is dropped
582 // immediately after pop() is called, which allows the code to remain somewhat simpler.
lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain>583 fn lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain> {
584     queue.lock().pop()
585 }
586