xref: /aosp_15_r20/external/crosvm/devices/src/virtio/snd/vios_backend/streams.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::VecDeque;
6 use std::sync::mpsc::channel;
7 use std::sync::mpsc::Receiver;
8 use std::sync::mpsc::Sender;
9 use std::sync::Arc;
10 use std::thread;
11 use std::time::Duration;
12 use std::time::Instant;
13 
14 use base::error;
15 use base::set_rt_prio_limit;
16 use base::set_rt_round_robin;
17 use base::warn;
18 use data_model::Le32;
19 use serde::Deserialize;
20 use serde::Serialize;
21 use sync::Mutex;
22 
23 use super::Error as VioSError;
24 use super::Result;
25 use super::SoundError;
26 use super::*;
27 use crate::virtio::snd::common::from_virtio_frame_rate;
28 use crate::virtio::snd::constants::*;
29 use crate::virtio::snd::layout::*;
30 use crate::virtio::DescriptorChain;
31 use crate::virtio::Queue;
32 
33 /// Messages that the worker can send to the stream (thread).
34 pub enum StreamMsg {
35     SetParams(DescriptorChain, virtio_snd_pcm_set_params),
36     Prepare(DescriptorChain),
37     Start(DescriptorChain),
38     Stop(DescriptorChain),
39     Release(DescriptorChain),
40     Buffer(DescriptorChain),
41     Break,
42 }
43 
44 #[derive(Clone, Serialize, Deserialize)]
45 pub enum StreamState {
46     New,
47     ParamsSet,
48     Prepared,
49     Started,
50     Stopped,
51     Released,
52 }
53 
54 pub struct Stream {
55     stream_id: u32,
56     receiver: Receiver<Box<StreamMsg>>,
57     vios_client: Arc<Mutex<VioSClient>>,
58     control_queue: Arc<Mutex<Queue>>,
59     io_queue: Arc<Mutex<Queue>>,
60     capture: bool,
61     current_state: StreamState,
62     period: Duration,
63     start_time: Instant,
64     next_buffer: Duration,
65     buffer_queue: VecDeque<DescriptorChain>,
66 }
67 
68 #[derive(Clone, Serialize, Deserialize)]
69 pub struct StreamSnapshot {
70     pub current_state: StreamState,
71     pub period: Duration,
72     pub next_buffer: Duration,
73 }
74 
75 impl Stream {
76     /// Start a new stream thread and return its handler.
try_new( stream_id: u32, vios_client: Arc<Mutex<VioSClient>>, control_queue: Arc<Mutex<Queue>>, io_queue: Arc<Mutex<Queue>>, capture: bool, stream_state: Option<StreamSnapshot>, ) -> Result<StreamProxy>77     pub fn try_new(
78         stream_id: u32,
79         vios_client: Arc<Mutex<VioSClient>>,
80         control_queue: Arc<Mutex<Queue>>,
81         io_queue: Arc<Mutex<Queue>>,
82         capture: bool,
83         stream_state: Option<StreamSnapshot>,
84     ) -> Result<StreamProxy> {
85         let (sender, receiver): (Sender<Box<StreamMsg>>, Receiver<Box<StreamMsg>>) = channel();
86         let thread = thread::Builder::new()
87             .name(format!("v_snd_stream:{stream_id}"))
88             .spawn(move || {
89                 try_set_real_time_priority();
90                 let (current_state, period, next_buffer) =
91                     if let Some(stream_state) = stream_state.clone() {
92                         (
93                             stream_state.current_state,
94                             stream_state.period,
95                             stream_state.next_buffer,
96                         )
97                     } else {
98                         (
99                             StreamState::New,
100                             Duration::from_millis(0),
101                             Duration::from_millis(0),
102                         )
103                     };
104 
105                 let mut stream = Stream {
106                     stream_id,
107                     receiver,
108                     vios_client: vios_client.clone(),
109                     control_queue,
110                     io_queue,
111                     capture,
112                     current_state,
113                     period,
114                     start_time: Instant::now(),
115                     next_buffer,
116                     buffer_queue: VecDeque::new(),
117                 };
118 
119                 if let Some(stream_state) = stream_state {
120                     if let Err(e) = vios_client
121                         .lock()
122                         .restore_stream(stream_id, stream_state.current_state)
123                     {
124                         error!("failed to restore stream params: {}", e);
125                     };
126                 }
127                 if let Err(e) = stream.stream_loop() {
128                     error!("virtio-snd: Error in stream {}: {}", stream_id, e);
129                 }
130                 let state = stream.current_state.clone();
131                 StreamSnapshot {
132                     current_state: state,
133                     period: stream.period,
134                     next_buffer: stream.next_buffer,
135                 }
136             })
137             .map_err(SoundError::CreateThread)?;
138         Ok(StreamProxy {
139             sender,
140             thread: Some(thread),
141         })
142     }
143 
stream_loop(&mut self) -> Result<()>144     fn stream_loop(&mut self) -> Result<()> {
145         loop {
146             if !self.recv_msg()? {
147                 break;
148             }
149             self.maybe_process_queued_buffers()?;
150         }
151         Ok(())
152     }
153 
recv_msg(&mut self) -> Result<bool>154     fn recv_msg(&mut self) -> Result<bool> {
155         let msg = self.receiver.recv().map_err(SoundError::StreamThreadRecv)?;
156         let (code, desc, next_state) = match *msg {
157             StreamMsg::SetParams(desc, params) => {
158                 let code = match self.vios_client.lock().set_stream_parameters_raw(params) {
159                     Ok(()) => {
160                         let frame_rate = from_virtio_frame_rate(params.rate).unwrap_or(0) as u64;
161                         self.period = Duration::from_nanos(
162                             (params.period_bytes.to_native() as u64 * 1_000_000_000u64)
163                                 / frame_rate
164                                 / params.channels as u64
165                                 / bytes_per_sample(params.format) as u64,
166                         );
167                         VIRTIO_SND_S_OK
168                     }
169                     Err(e) => {
170                         error!(
171                             "virtio-snd: Error setting parameters for stream {}: {}",
172                             self.stream_id, e
173                         );
174                         vios_error_to_status_code(e)
175                     }
176                 };
177                 (code, desc, StreamState::ParamsSet)
178             }
179             StreamMsg::Prepare(desc) => {
180                 let code = match self.vios_client.lock().prepare_stream(self.stream_id) {
181                     Ok(()) => VIRTIO_SND_S_OK,
182                     Err(e) => {
183                         error!(
184                             "virtio-snd: Failed to prepare stream {}: {}",
185                             self.stream_id, e
186                         );
187                         vios_error_to_status_code(e)
188                     }
189                 };
190                 (code, desc, StreamState::Prepared)
191             }
192             StreamMsg::Start(desc) => {
193                 let code = match self.vios_client.lock().start_stream(self.stream_id) {
194                     Ok(()) => VIRTIO_SND_S_OK,
195                     Err(e) => {
196                         error!(
197                             "virtio-snd: Failed to start stream {}: {}",
198                             self.stream_id, e
199                         );
200                         vios_error_to_status_code(e)
201                     }
202                 };
203                 self.start_time = Instant::now();
204                 self.next_buffer = Duration::from_millis(0);
205                 (code, desc, StreamState::Started)
206             }
207             StreamMsg::Stop(desc) => {
208                 let code = match self.vios_client.lock().stop_stream(self.stream_id) {
209                     Ok(()) => VIRTIO_SND_S_OK,
210                     Err(e) => {
211                         error!(
212                             "virtio-snd: Failed to stop stream {}: {}",
213                             self.stream_id, e
214                         );
215                         vios_error_to_status_code(e)
216                     }
217                 };
218                 (code, desc, StreamState::Stopped)
219             }
220             StreamMsg::Release(desc) => {
221                 let code = match self.vios_client.lock().release_stream(self.stream_id) {
222                     Ok(()) => VIRTIO_SND_S_OK,
223                     Err(e) => {
224                         error!(
225                             "virtio-snd: Failed to release stream {}: {}",
226                             self.stream_id, e
227                         );
228                         vios_error_to_status_code(e)
229                     }
230                 };
231                 (code, desc, StreamState::Released)
232             }
233             StreamMsg::Buffer(d) => {
234                 // Buffers may arrive while in several states:
235                 // - Prepared: Buffer should be queued and played when start cmd arrives
236                 // - Started: Buffer should be processed immediately
237                 // - Stopped: Buffer should be returned to the guest immediately
238                 // Because we may need to wait to process the buffer, we always queue it and
239                 // decide what to do with queued buffers after every message.
240                 self.buffer_queue.push_back(d);
241                 // return here to avoid replying on control queue below
242                 return Ok(true);
243             }
244             StreamMsg::Break => {
245                 return Ok(false);
246             }
247         };
248         reply_control_op_status(code, desc, &self.control_queue)?;
249         self.current_state = next_state;
250         Ok(true)
251     }
252 
maybe_process_queued_buffers(&mut self) -> Result<()>253     fn maybe_process_queued_buffers(&mut self) -> Result<()> {
254         match self.current_state {
255             StreamState::Started => {
256                 while let Some(mut desc) = self.buffer_queue.pop_front() {
257                     let reader = &mut desc.reader;
258                     // Ignore the first buffer, it was already read by the time this thread
259                     // receives the descriptor
260                     reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
261                     let writer = &mut desc.writer;
262                     let io_res = if self.capture {
263                         let buffer_size =
264                             writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>();
265                         self.vios_client.lock().request_audio_data(
266                             self.stream_id,
267                             buffer_size,
268                             |vslice| writer.write_from_volatile_slice(*vslice),
269                         )
270                     } else {
271                         self.vios_client.lock().inject_audio_data(
272                             self.stream_id,
273                             reader.available_bytes(),
274                             |vslice| reader.read_to_volatile_slice(vslice),
275                         )
276                     };
277                     let (code, latency) = match io_res {
278                         Ok((latency, _)) => (VIRTIO_SND_S_OK, latency),
279                         Err(e) => {
280                             error!(
281                                 "virtio-snd: Failed IO operation in stream {}: {}",
282                                 self.stream_id, e
283                             );
284                             (VIRTIO_SND_S_IO_ERR, 0)
285                         }
286                     };
287                     if let Err(e) = writer.write_obj(virtio_snd_pcm_status {
288                         status: Le32::from(code),
289                         latency_bytes: Le32::from(latency),
290                     }) {
291                         error!(
292                             "virtio-snd: Failed to write pcm status from stream {} thread: {}",
293                             self.stream_id, e
294                         );
295                     }
296 
297                     self.next_buffer += self.period;
298                     let elapsed = self.start_time.elapsed();
299                     if elapsed < self.next_buffer {
300                         // Completing an IO request can be considered an elapsed period
301                         // notification by the driver, so we must wait the right amount of time to
302                         // release the buffer if the sound server client returned too soon.
303                         std::thread::sleep(self.next_buffer - elapsed);
304                     }
305                     let len = writer.bytes_written() as u32;
306                     {
307                         let mut io_queue_lock = self.io_queue.lock();
308                         io_queue_lock.add_used(desc, len);
309                         io_queue_lock.trigger_interrupt();
310                     }
311                 }
312             }
313             StreamState::Stopped | StreamState::Released => {
314                 // For some reason playback buffers can arrive after stop and release (maybe because
315                 // buffer-ready notifications arrive over eventfds and those are processed in
316                 // random order?). The spec requires the device to not confirm the release of a
317                 // stream until all IO buffers have been released, but that's impossible to
318                 // guarantee if a buffer arrives after release is requested. Luckily it seems to
319                 // work fine if the buffer is released after the release command is completed.
320                 while let Some(desc) = self.buffer_queue.pop_front() {
321                     reply_pcm_buffer_status(VIRTIO_SND_S_OK, 0, desc, &self.io_queue)?;
322                 }
323             }
324             StreamState::Prepared => {} // Do nothing, any buffers will be processed after start
325             _ => {
326                 if !self.buffer_queue.is_empty() {
327                     warn!("virtio-snd: Buffers received while in unexpected state");
328                 }
329             }
330         }
331         Ok(())
332     }
333 }
334 
335 impl Drop for Stream {
drop(&mut self)336     fn drop(&mut self) {
337         // Try to stop and release the stream in case it was playing, these operations will fail if
338         // the stream is already released, just ignore that failure
339         let _ = self.vios_client.lock().stop_stream(self.stream_id);
340         let _ = self.vios_client.lock().release_stream(self.stream_id);
341 
342         // Also release any pending buffer
343         while let Some(desc) = self.buffer_queue.pop_front() {
344             if let Err(e) = reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, desc, &self.io_queue) {
345                 error!(
346                     "virtio-snd: Failed to reply buffer on stream {}: {}",
347                     self.stream_id, e
348                 );
349             }
350         }
351     }
352 }
353 
354 /// Basically a proxy to the thread handling a particular stream.
355 pub struct StreamProxy {
356     sender: Sender<Box<StreamMsg>>,
357     thread: Option<thread::JoinHandle<StreamSnapshot>>,
358 }
359 
360 impl StreamProxy {
361     /// Access the underlying sender to clone it or send messages
msg_sender(&self) -> &Sender<Box<StreamMsg>>362     pub fn msg_sender(&self) -> &Sender<Box<StreamMsg>> {
363         &self.sender
364     }
365 
366     /// Send a message to the stream thread on the other side of this sender
send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()>367     pub fn send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()> {
368         sender
369             .send(Box::new(msg))
370             .map_err(SoundError::StreamThreadSend)
371     }
372 
373     /// Convenience function to send a message to this stream's thread
send(&self, msg: StreamMsg) -> Result<()>374     pub fn send(&self, msg: StreamMsg) -> Result<()> {
375         Self::send_msg(&self.sender, msg)
376     }
377 
stop_thread(mut self) -> StreamSnapshot378     pub fn stop_thread(mut self) -> StreamSnapshot {
379         self.stop_thread_inner().unwrap()
380     }
381 
stop_thread_inner(&mut self) -> Option<StreamSnapshot>382     fn stop_thread_inner(&mut self) -> Option<StreamSnapshot> {
383         if let Some(th) = self.thread.take() {
384             if let Err(e) = self.send(StreamMsg::Break) {
385                 error!(
386                     "virtio-snd: Failed to send Break msg to stream thread: {}",
387                     e
388                 );
389             }
390             match th.join() {
391                 Ok(state) => Some(state),
392                 Err(e) => panic!("virtio-snd: Panic detected on stream thread: {:?}", e),
393             }
394         } else {
395             None
396         }
397     }
398 }
399 
400 impl Drop for StreamProxy {
drop(&mut self)401     fn drop(&mut self) {
402         let _ = self.stop_thread_inner();
403     }
404 }
405 
406 /// Attempts to set the current thread's priority to a value hight enough to handle audio IO. This
407 /// may fail due to insuficient permissions.
try_set_real_time_priority()408 pub fn try_set_real_time_priority() {
409     const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
410     if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
411         .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
412     {
413         warn!("Failed to set audio stream thread to real time: {}", e);
414     }
415 }
416 
417 /// Gets the appropriate virtio-snd error to return to the driver from a VioSError.
vios_error_to_status_code(e: VioSError) -> u32418 pub fn vios_error_to_status_code(e: VioSError) -> u32 {
419     match e {
420         VioSError::ServerIOError(_) => VIRTIO_SND_S_IO_ERR,
421         _ => VIRTIO_SND_S_NOT_SUPP,
422     }
423 }
424 
425 /// Encapsulates sending the virtio_snd_hdr struct back to the driver.
reply_control_op_status( code: u32, mut desc: DescriptorChain, queue: &Arc<Mutex<Queue>>, ) -> Result<()>426 pub fn reply_control_op_status(
427     code: u32,
428     mut desc: DescriptorChain,
429     queue: &Arc<Mutex<Queue>>,
430 ) -> Result<()> {
431     let writer = &mut desc.writer;
432     writer
433         .write_obj(virtio_snd_hdr {
434             code: Le32::from(code),
435         })
436         .map_err(SoundError::QueueIO)?;
437     let len = writer.bytes_written() as u32;
438     {
439         let mut queue_lock = queue.lock();
440         queue_lock.add_used(desc, len);
441         queue_lock.trigger_interrupt();
442     }
443     Ok(())
444 }
445 
446 /// Encapsulates sending the virtio_snd_pcm_status struct back to the driver.
reply_pcm_buffer_status( status: u32, latency_bytes: u32, mut desc: DescriptorChain, queue: &Arc<Mutex<Queue>>, ) -> Result<()>447 pub fn reply_pcm_buffer_status(
448     status: u32,
449     latency_bytes: u32,
450     mut desc: DescriptorChain,
451     queue: &Arc<Mutex<Queue>>,
452 ) -> Result<()> {
453     let writer = &mut desc.writer;
454     if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
455         writer
456             .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
457     }
458     writer
459         .write_obj(virtio_snd_pcm_status {
460             status: Le32::from(status),
461             latency_bytes: Le32::from(latency_bytes),
462         })
463         .map_err(SoundError::QueueIO)?;
464     let len = writer.bytes_written() as u32;
465     {
466         let mut queue_lock = queue.lock();
467         queue_lock.add_used(desc, len);
468         queue_lock.trigger_interrupt();
469     }
470     Ok(())
471 }
472 
bytes_per_sample(format: u8) -> usize473 fn bytes_per_sample(format: u8) -> usize {
474     match format {
475         VIRTIO_SND_PCM_FMT_IMA_ADPCM => 1usize,
476         VIRTIO_SND_PCM_FMT_MU_LAW => 1usize,
477         VIRTIO_SND_PCM_FMT_A_LAW => 1usize,
478         VIRTIO_SND_PCM_FMT_S8 => 1usize,
479         VIRTIO_SND_PCM_FMT_U8 => 1usize,
480         VIRTIO_SND_PCM_FMT_S16 => 2usize,
481         VIRTIO_SND_PCM_FMT_U16 => 2usize,
482         VIRTIO_SND_PCM_FMT_S32 => 4usize,
483         VIRTIO_SND_PCM_FMT_U32 => 4usize,
484         VIRTIO_SND_PCM_FMT_FLOAT => 4usize,
485         VIRTIO_SND_PCM_FMT_FLOAT64 => 8usize,
486         // VIRTIO_SND_PCM_FMT_DSD_U8
487         // VIRTIO_SND_PCM_FMT_DSD_U16
488         // VIRTIO_SND_PCM_FMT_DSD_U32
489         // VIRTIO_SND_PCM_FMT_IEC958_SUBFRAME
490         // VIRTIO_SND_PCM_FMT_S18_3
491         // VIRTIO_SND_PCM_FMT_U18_3
492         // VIRTIO_SND_PCM_FMT_S20_3
493         // VIRTIO_SND_PCM_FMT_U20_3
494         // VIRTIO_SND_PCM_FMT_S24_3
495         // VIRTIO_SND_PCM_FMT_U24_3
496         // VIRTIO_SND_PCM_FMT_S20
497         // VIRTIO_SND_PCM_FMT_U20
498         // VIRTIO_SND_PCM_FMT_S24
499         // VIRTIO_SND_PCM_FMT_U24
500         _ => {
501             // Some of these formats are not consistently stored in a particular size (24bits is
502             // sometimes stored in a 32bit word) while others are of variable size.
503             // The size per sample estimated here is designed to greatly underestimate the time it
504             // takes to play a buffer and depend instead on timings provided by the sound server if
505             // it supports these formats.
506             warn!(
507                 "Unknown sample size for format {}, depending on sound server timing instead.",
508                 format
509             );
510             1000usize
511         }
512     }
513 }
514