1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::io::Read;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9
10 use base::error;
11 use base::warn;
12 use base::Event;
13 use base::EventToken;
14 use base::WaitContext;
15 use data_model::Le32;
16 use sync::Mutex;
17 use zerocopy::AsBytes;
18
19 use super::super::constants::*;
20 use super::super::layout::*;
21 use super::streams::*;
22 use super::Result;
23 use super::SoundError;
24 use super::*;
25 use crate::virtio::DescriptorChain;
26 use crate::virtio::Interrupt;
27 use crate::virtio::Queue;
28
29 pub struct Worker {
30 // Lock order: Must never hold more than one queue lock at the same time.
31 interrupt: Interrupt,
32 pub control_queue: Arc<Mutex<Queue>>,
33 pub event_queue: Option<Queue>,
34 vios_client: Arc<Mutex<VioSClient>>,
35 streams: Vec<StreamProxy>,
36 pub tx_queue: Arc<Mutex<Queue>>,
37 pub rx_queue: Arc<Mutex<Queue>>,
38 io_thread: Option<thread::JoinHandle<Result<()>>>,
39 io_kill: Event,
40 // saved_stream_state holds the previous state of streams. When the sound device is newly
41 // created, this will be empty. It will only contain state if the sound device is put to sleep
42 // OR if we restore a VM.
43 pub saved_stream_state: Vec<StreamSnapshot>,
44 }
45
46 impl Worker {
47 /// Creates a new virtio-snd worker.
try_new( vios_client: Arc<Mutex<VioSClient>>, interrupt: Interrupt, control_queue: Arc<Mutex<Queue>>, event_queue: Queue, tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, saved_stream_state: Vec<StreamSnapshot>, ) -> Result<Worker>48 pub fn try_new(
49 vios_client: Arc<Mutex<VioSClient>>,
50 interrupt: Interrupt,
51 control_queue: Arc<Mutex<Queue>>,
52 event_queue: Queue,
53 tx_queue: Arc<Mutex<Queue>>,
54 rx_queue: Arc<Mutex<Queue>>,
55 saved_stream_state: Vec<StreamSnapshot>,
56 ) -> Result<Worker> {
57 let num_streams = vios_client.lock().num_streams();
58 let mut streams: Vec<StreamProxy> = Vec::with_capacity(num_streams as usize);
59 {
60 for stream_id in 0..num_streams {
61 let capture = vios_client
62 .lock()
63 .stream_info(stream_id)
64 .map(|i| i.direction == VIRTIO_SND_D_INPUT)
65 .unwrap_or(false);
66 let io_queue = if capture { &rx_queue } else { &tx_queue };
67 streams.push(Stream::try_new(
68 stream_id,
69 vios_client.clone(),
70 control_queue.clone(),
71 io_queue.clone(),
72 capture,
73 saved_stream_state.get(stream_id as usize).cloned(),
74 )?);
75 }
76 }
77 let (self_kill_io, kill_io) = Event::new()
78 .and_then(|e| Ok((e.try_clone()?, e)))
79 .map_err(SoundError::CreateEvent)?;
80
81 let senders: Vec<Sender<Box<StreamMsg>>> =
82 streams.iter().map(|sp| sp.msg_sender().clone()).collect();
83 let tx_queue_thread = tx_queue.clone();
84 let rx_queue_thread = rx_queue.clone();
85 let io_thread = thread::Builder::new()
86 .name("v_snd_io".to_string())
87 .spawn(move || {
88 try_set_real_time_priority();
89
90 io_loop(tx_queue_thread, rx_queue_thread, senders, kill_io)
91 })
92 .map_err(SoundError::CreateThread)?;
93 Ok(Worker {
94 interrupt,
95 control_queue,
96 event_queue: Some(event_queue),
97 vios_client,
98 streams,
99 tx_queue,
100 rx_queue,
101 io_thread: Some(io_thread),
102 io_kill: self_kill_io,
103 saved_stream_state: Vec::new(),
104 })
105 }
106
107 /// Emulates the virtio-snd device. It won't return until something is written to the kill_evt
108 /// event or an unrecoverable error occurs.
control_loop(&mut self, kill_evt: Event) -> Result<()>109 pub fn control_loop(&mut self, kill_evt: Event) -> Result<()> {
110 let event_notifier = self
111 .vios_client
112 .lock()
113 .get_event_notifier()
114 .map_err(SoundError::ClientEventNotifier)?;
115 #[derive(EventToken)]
116 enum Token {
117 ControlQAvailable,
118 EventQAvailable,
119 InterruptResample,
120 EventTriggered,
121 Kill,
122 }
123 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
124 (self.control_queue.lock().event(), Token::ControlQAvailable),
125 (
126 self.event_queue.as_ref().expect("queue missing").event(),
127 Token::EventQAvailable,
128 ),
129 (&event_notifier, Token::EventTriggered),
130 (&kill_evt, Token::Kill),
131 ])
132 .map_err(SoundError::WaitCtx)?;
133
134 if let Some(resample_evt) = self.interrupt.get_resample_evt() {
135 wait_ctx
136 .add(resample_evt, Token::InterruptResample)
137 .map_err(SoundError::WaitCtx)?;
138 }
139 let mut event_queue = self.event_queue.take().expect("event_queue missing");
140 'wait: loop {
141 let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
142
143 for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
144 match wait_evt.token {
145 Token::ControlQAvailable => {
146 self.control_queue
147 .lock()
148 .event()
149 .wait()
150 .map_err(SoundError::QueueEvt)?;
151 self.process_controlq_buffers()?;
152 }
153 Token::EventQAvailable => {
154 // Just read from the event object to make sure the producer of such events
155 // never blocks. The buffers will only be used when actual virtio-snd
156 // events are triggered.
157 event_queue.event().wait().map_err(SoundError::QueueEvt)?;
158 }
159 Token::EventTriggered => {
160 event_notifier.wait().map_err(SoundError::QueueEvt)?;
161 self.process_event_triggered(&mut event_queue)?;
162 }
163 Token::InterruptResample => {
164 self.interrupt.interrupt_resample();
165 }
166 Token::Kill => {
167 let _ = kill_evt.wait();
168 break 'wait;
169 }
170 }
171 }
172 }
173 self.saved_stream_state = self
174 .streams
175 .drain(..)
176 .map(|stream| stream.stop_thread())
177 .collect();
178 self.event_queue = Some(event_queue);
179 Ok(())
180 }
181
stop_io_thread(&mut self)182 fn stop_io_thread(&mut self) {
183 if let Err(e) = self.io_kill.signal() {
184 error!(
185 "virtio-snd: Failed to send Break msg to stream thread: {}",
186 e
187 );
188 }
189 if let Some(th) = self.io_thread.take() {
190 match th.join() {
191 Err(e) => {
192 error!("virtio-snd: Panic detected on stream thread: {:?}", e);
193 }
194 Ok(r) => {
195 if let Err(e) = r {
196 error!("virtio-snd: IO thread exited with and error: {}", e);
197 }
198 }
199 }
200 }
201 }
202
203 // Pops and handles all available ontrol queue buffers. Logs minor errors, but returns an
204 // Err if it encounters an unrecoverable error.
process_controlq_buffers(&mut self) -> Result<()>205 fn process_controlq_buffers(&mut self) -> Result<()> {
206 while let Some(mut avail_desc) = lock_pop_unlock(&self.control_queue) {
207 let reader = &mut avail_desc.reader;
208 let available_bytes = reader.available_bytes();
209 if available_bytes < std::mem::size_of::<virtio_snd_hdr>() {
210 error!(
211 "virtio-snd: Message received on control queue is too small: {}",
212 available_bytes
213 );
214 return reply_control_op_status(
215 VIRTIO_SND_S_BAD_MSG,
216 avail_desc,
217 &self.control_queue,
218 );
219 }
220 let mut read_buf = vec![0u8; available_bytes];
221 reader
222 .read_exact(&mut read_buf)
223 .map_err(SoundError::QueueIO)?;
224 let mut code: Le32 = Default::default();
225 // need to copy because the buffer may not be properly aligned
226 code.as_bytes_mut()
227 .copy_from_slice(&read_buf[..std::mem::size_of::<Le32>()]);
228 let request_type = code.to_native();
229 match request_type {
230 VIRTIO_SND_R_JACK_INFO => {
231 let (code, info_vec) = {
232 match self.parse_info_query(&read_buf) {
233 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
234 Some((start_id, count)) => {
235 let end_id = start_id.saturating_add(count);
236 if end_id > self.vios_client.lock().num_jacks() {
237 error!(
238 "virtio-snd: Requested info on invalid jacks ids: {}..{}",
239 start_id,
240 end_id - 1
241 );
242 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
243 } else {
244 (
245 VIRTIO_SND_S_OK,
246 // Safe to unwrap because we just ensured all the ids are
247 // valid
248 (start_id..end_id)
249 .map(|id| {
250 self.vios_client.lock().jack_info(id).unwrap()
251 })
252 .collect(),
253 )
254 }
255 }
256 }
257 };
258 self.send_info_reply(avail_desc, code, info_vec)?;
259 }
260 VIRTIO_SND_R_JACK_REMAP => {
261 let code = if read_buf.len() != std::mem::size_of::<virtio_snd_jack_remap>() {
262 error!(
263 "virtio-snd: The driver sent the wrong number bytes for a jack_remap struct: {}",
264 read_buf.len()
265 );
266 VIRTIO_SND_S_BAD_MSG
267 } else {
268 let mut request: virtio_snd_jack_remap = Default::default();
269 request.as_bytes_mut().copy_from_slice(&read_buf);
270 let jack_id = request.hdr.jack_id.to_native();
271 let association = request.association.to_native();
272 let sequence = request.sequence.to_native();
273 if let Err(e) =
274 self.vios_client
275 .lock()
276 .remap_jack(jack_id, association, sequence)
277 {
278 error!("virtio-snd: Failed to remap jack: {}", e);
279 vios_error_to_status_code(e)
280 } else {
281 VIRTIO_SND_S_OK
282 }
283 };
284 let writer = &mut avail_desc.writer;
285 writer
286 .write_obj(virtio_snd_hdr {
287 code: Le32::from(code),
288 })
289 .map_err(SoundError::QueueIO)?;
290 let len = writer.bytes_written() as u32;
291 {
292 let mut queue_lock = self.control_queue.lock();
293 queue_lock.add_used(avail_desc, len);
294 queue_lock.trigger_interrupt();
295 }
296 }
297 VIRTIO_SND_R_CHMAP_INFO => {
298 let (code, info_vec) = {
299 match self.parse_info_query(&read_buf) {
300 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
301 Some((start_id, count)) => {
302 let end_id = start_id.saturating_add(count);
303 let num_chmaps = self.vios_client.lock().num_chmaps();
304 if end_id > num_chmaps {
305 error!(
306 "virtio-snd: Requested info on invalid chmaps ids: {}..{}",
307 start_id,
308 end_id - 1
309 );
310 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
311 } else {
312 (
313 VIRTIO_SND_S_OK,
314 // Safe to unwrap because we just ensured all the ids are
315 // valid
316 (start_id..end_id)
317 .map(|id| {
318 self.vios_client.lock().chmap_info(id).unwrap()
319 })
320 .collect(),
321 )
322 }
323 }
324 }
325 };
326 self.send_info_reply(avail_desc, code, info_vec)?;
327 }
328 VIRTIO_SND_R_PCM_INFO => {
329 let (code, info_vec) = {
330 match self.parse_info_query(&read_buf) {
331 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
332 Some((start_id, count)) => {
333 let end_id = start_id.saturating_add(count);
334 if end_id > self.vios_client.lock().num_streams() {
335 error!(
336 "virtio-snd: Requested info on invalid stream ids: {}..{}",
337 start_id,
338 end_id - 1
339 );
340 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
341 } else {
342 (
343 VIRTIO_SND_S_OK,
344 // Safe to unwrap because we just ensured all the ids are
345 // valid
346 (start_id..end_id)
347 .map(|id| {
348 self.vios_client.lock().stream_info(id).unwrap()
349 })
350 .collect(),
351 )
352 }
353 }
354 }
355 };
356 self.send_info_reply(avail_desc, code, info_vec)?;
357 }
358 VIRTIO_SND_R_PCM_SET_PARAMS => self.process_set_params(avail_desc, &read_buf)?,
359 VIRTIO_SND_R_PCM_PREPARE => {
360 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Prepare(avail_desc))?
361 }
362 VIRTIO_SND_R_PCM_RELEASE => {
363 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Release(avail_desc))?
364 }
365 VIRTIO_SND_R_PCM_START => {
366 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Start(avail_desc))?
367 }
368 VIRTIO_SND_R_PCM_STOP => {
369 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Stop(avail_desc))?
370 }
371 _ => {
372 error!(
373 "virtio-snd: Unknown control queue mesage code: {}",
374 request_type
375 );
376 reply_control_op_status(
377 VIRTIO_SND_S_NOT_SUPP,
378 avail_desc,
379 &self.control_queue,
380 )?;
381 }
382 }
383 }
384 Ok(())
385 }
386
process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()>387 fn process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()> {
388 while let Some(evt) = self.vios_client.lock().pop_event() {
389 if let Some(mut desc) = event_queue.pop() {
390 let writer = &mut desc.writer;
391 writer.write_obj(evt).map_err(SoundError::QueueIO)?;
392 let len = writer.bytes_written() as u32;
393 event_queue.add_used(desc, len);
394 event_queue.trigger_interrupt();
395 } else {
396 warn!("virtio-snd: Dropping event because there are no buffers in virtqueue");
397 }
398 }
399 Ok(())
400 }
401
parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)>402 fn parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)> {
403 if read_buf.len() != std::mem::size_of::<virtio_snd_query_info>() {
404 error!(
405 "virtio-snd: The driver sent the wrong number bytes for a pcm_info struct: {}",
406 read_buf.len()
407 );
408 return None;
409 }
410 let mut query: virtio_snd_query_info = Default::default();
411 query.as_bytes_mut().copy_from_slice(read_buf);
412 let start_id = query.start_id.to_native();
413 let count = query.count.to_native();
414 Some((start_id, count))
415 }
416
417 // Returns Err if it encounters an unrecoverable error, Ok otherwise
process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()>418 fn process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()> {
419 if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_set_params>() {
420 error!(
421 "virtio-snd: The driver sent a buffer of the wrong size for a set_params struct: {}",
422 read_buf.len()
423 );
424 return reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue);
425 }
426 let mut params: virtio_snd_pcm_set_params = Default::default();
427 params.as_bytes_mut().copy_from_slice(read_buf);
428 let stream_id = params.hdr.stream_id.to_native();
429 if stream_id < self.vios_client.lock().num_streams() {
430 self.streams[stream_id as usize].send(StreamMsg::SetParams(desc, params))
431 } else {
432 error!(
433 "virtio-snd: Driver requested operation on invalid stream: {}",
434 stream_id
435 );
436 reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue)
437 }
438 }
439
440 // Returns Err if it encounters an unrecoverable error, Ok otherwise
try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()>441 fn try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()> {
442 if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_hdr>() {
443 error!(
444 "virtio-snd: The driver sent a buffer too small to contain a header: {}",
445 read_buf.len()
446 );
447 return reply_control_op_status(
448 VIRTIO_SND_S_BAD_MSG,
449 match msg {
450 StreamMsg::Prepare(d)
451 | StreamMsg::Start(d)
452 | StreamMsg::Stop(d)
453 | StreamMsg::Release(d) => d,
454 _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
455 },
456 &self.control_queue,
457 );
458 }
459 let mut pcm_hdr: virtio_snd_pcm_hdr = Default::default();
460 pcm_hdr.as_bytes_mut().copy_from_slice(read_buf);
461 let stream_id = pcm_hdr.stream_id.to_native();
462 if stream_id < self.vios_client.lock().num_streams() {
463 self.streams[stream_id as usize].send(msg)
464 } else {
465 error!(
466 "virtio-snd: Driver requested operation on invalid stream: {}",
467 stream_id
468 );
469 reply_control_op_status(
470 VIRTIO_SND_S_BAD_MSG,
471 match msg {
472 StreamMsg::Prepare(d)
473 | StreamMsg::Start(d)
474 | StreamMsg::Stop(d)
475 | StreamMsg::Release(d) => d,
476 _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
477 },
478 &self.control_queue,
479 )
480 }
481 }
482
send_info_reply<T: AsBytes>( &mut self, mut desc: DescriptorChain, code: u32, info_vec: Vec<T>, ) -> Result<()>483 fn send_info_reply<T: AsBytes>(
484 &mut self,
485 mut desc: DescriptorChain,
486 code: u32,
487 info_vec: Vec<T>,
488 ) -> Result<()> {
489 let writer = &mut desc.writer;
490 writer
491 .write_obj(virtio_snd_hdr {
492 code: Le32::from(code),
493 })
494 .map_err(SoundError::QueueIO)?;
495 for info in info_vec {
496 writer.write_obj(info).map_err(SoundError::QueueIO)?;
497 }
498 let len = writer.bytes_written() as u32;
499 {
500 let mut queue_lock = self.control_queue.lock();
501 queue_lock.add_used(desc, len);
502 queue_lock.trigger_interrupt();
503 }
504 Ok(())
505 }
506 }
507
508 impl Drop for Worker {
drop(&mut self)509 fn drop(&mut self) {
510 self.stop_io_thread();
511 }
512 }
513
io_loop( tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, senders: Vec<Sender<Box<StreamMsg>>>, kill_evt: Event, ) -> Result<()>514 fn io_loop(
515 tx_queue: Arc<Mutex<Queue>>,
516 rx_queue: Arc<Mutex<Queue>>,
517 senders: Vec<Sender<Box<StreamMsg>>>,
518 kill_evt: Event,
519 ) -> Result<()> {
520 #[derive(EventToken)]
521 enum Token {
522 TxQAvailable,
523 RxQAvailable,
524 Kill,
525 }
526 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
527 (tx_queue.lock().event(), Token::TxQAvailable),
528 (rx_queue.lock().event(), Token::RxQAvailable),
529 (&kill_evt, Token::Kill),
530 ])
531 .map_err(SoundError::WaitCtx)?;
532
533 'wait: loop {
534 let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
535 for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
536 let queue = match wait_evt.token {
537 Token::TxQAvailable => {
538 tx_queue
539 .lock()
540 .event()
541 .wait()
542 .map_err(SoundError::QueueEvt)?;
543 &tx_queue
544 }
545 Token::RxQAvailable => {
546 rx_queue
547 .lock()
548 .event()
549 .wait()
550 .map_err(SoundError::QueueEvt)?;
551 &rx_queue
552 }
553 Token::Kill => {
554 let _ = kill_evt.wait();
555 break 'wait;
556 }
557 };
558 while let Some(mut avail_desc) = lock_pop_unlock(queue) {
559 let reader = &mut avail_desc.reader;
560 let xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(SoundError::QueueIO)?;
561 let stream_id = xfer.stream_id.to_native();
562 if stream_id as usize >= senders.len() {
563 error!(
564 "virtio-snd: Driver sent buffer for invalid stream: {}",
565 stream_id
566 );
567 reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, avail_desc, queue)?;
568 } else {
569 StreamProxy::send_msg(
570 &senders[stream_id as usize],
571 StreamMsg::Buffer(avail_desc),
572 )?;
573 }
574 }
575 }
576 }
577 Ok(())
578 }
579
580 // If queue.lock().pop() is used directly in the condition of a 'while' loop the lock is held over
581 // the entire loop block. Encapsulating it in this fuction guarantees that the lock is dropped
582 // immediately after pop() is called, which allows the code to remain somewhat simpler.
lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain>583 fn lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain> {
584 queue.lock().pop()
585 }
586