1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::collections::VecDeque;
6 use std::sync::mpsc::channel;
7 use std::sync::mpsc::Receiver;
8 use std::sync::mpsc::Sender;
9 use std::sync::Arc;
10 use std::thread;
11 use std::time::Duration;
12 use std::time::Instant;
13
14 use base::error;
15 use base::set_rt_prio_limit;
16 use base::set_rt_round_robin;
17 use base::warn;
18 use data_model::Le32;
19 use serde::Deserialize;
20 use serde::Serialize;
21 use sync::Mutex;
22
23 use super::Error as VioSError;
24 use super::Result;
25 use super::SoundError;
26 use super::*;
27 use crate::virtio::snd::common::from_virtio_frame_rate;
28 use crate::virtio::snd::constants::*;
29 use crate::virtio::snd::layout::*;
30 use crate::virtio::DescriptorChain;
31 use crate::virtio::Queue;
32
33 /// Messages that the worker can send to the stream (thread).
34 pub enum StreamMsg {
35 SetParams(DescriptorChain, virtio_snd_pcm_set_params),
36 Prepare(DescriptorChain),
37 Start(DescriptorChain),
38 Stop(DescriptorChain),
39 Release(DescriptorChain),
40 Buffer(DescriptorChain),
41 Break,
42 }
43
44 #[derive(Clone, Serialize, Deserialize)]
45 pub enum StreamState {
46 New,
47 ParamsSet,
48 Prepared,
49 Started,
50 Stopped,
51 Released,
52 }
53
54 pub struct Stream {
55 stream_id: u32,
56 receiver: Receiver<Box<StreamMsg>>,
57 vios_client: Arc<Mutex<VioSClient>>,
58 control_queue: Arc<Mutex<Queue>>,
59 io_queue: Arc<Mutex<Queue>>,
60 capture: bool,
61 current_state: StreamState,
62 period: Duration,
63 start_time: Instant,
64 next_buffer: Duration,
65 buffer_queue: VecDeque<DescriptorChain>,
66 }
67
68 #[derive(Clone, Serialize, Deserialize)]
69 pub struct StreamSnapshot {
70 pub current_state: StreamState,
71 pub period: Duration,
72 pub next_buffer: Duration,
73 }
74
75 impl Stream {
76 /// Start a new stream thread and return its handler.
try_new( stream_id: u32, vios_client: Arc<Mutex<VioSClient>>, control_queue: Arc<Mutex<Queue>>, io_queue: Arc<Mutex<Queue>>, capture: bool, stream_state: Option<StreamSnapshot>, ) -> Result<StreamProxy>77 pub fn try_new(
78 stream_id: u32,
79 vios_client: Arc<Mutex<VioSClient>>,
80 control_queue: Arc<Mutex<Queue>>,
81 io_queue: Arc<Mutex<Queue>>,
82 capture: bool,
83 stream_state: Option<StreamSnapshot>,
84 ) -> Result<StreamProxy> {
85 let (sender, receiver): (Sender<Box<StreamMsg>>, Receiver<Box<StreamMsg>>) = channel();
86 let thread = thread::Builder::new()
87 .name(format!("v_snd_stream:{stream_id}"))
88 .spawn(move || {
89 try_set_real_time_priority();
90 let (current_state, period, next_buffer) =
91 if let Some(stream_state) = stream_state.clone() {
92 (
93 stream_state.current_state,
94 stream_state.period,
95 stream_state.next_buffer,
96 )
97 } else {
98 (
99 StreamState::New,
100 Duration::from_millis(0),
101 Duration::from_millis(0),
102 )
103 };
104
105 let mut stream = Stream {
106 stream_id,
107 receiver,
108 vios_client: vios_client.clone(),
109 control_queue,
110 io_queue,
111 capture,
112 current_state,
113 period,
114 start_time: Instant::now(),
115 next_buffer,
116 buffer_queue: VecDeque::new(),
117 };
118
119 if let Some(stream_state) = stream_state {
120 if let Err(e) = vios_client
121 .lock()
122 .restore_stream(stream_id, stream_state.current_state)
123 {
124 error!("failed to restore stream params: {}", e);
125 };
126 }
127 if let Err(e) = stream.stream_loop() {
128 error!("virtio-snd: Error in stream {}: {}", stream_id, e);
129 }
130 let state = stream.current_state.clone();
131 StreamSnapshot {
132 current_state: state,
133 period: stream.period,
134 next_buffer: stream.next_buffer,
135 }
136 })
137 .map_err(SoundError::CreateThread)?;
138 Ok(StreamProxy {
139 sender,
140 thread: Some(thread),
141 })
142 }
143
stream_loop(&mut self) -> Result<()>144 fn stream_loop(&mut self) -> Result<()> {
145 loop {
146 if !self.recv_msg()? {
147 break;
148 }
149 self.maybe_process_queued_buffers()?;
150 }
151 Ok(())
152 }
153
recv_msg(&mut self) -> Result<bool>154 fn recv_msg(&mut self) -> Result<bool> {
155 let msg = self.receiver.recv().map_err(SoundError::StreamThreadRecv)?;
156 let (code, desc, next_state) = match *msg {
157 StreamMsg::SetParams(desc, params) => {
158 let code = match self.vios_client.lock().set_stream_parameters_raw(params) {
159 Ok(()) => {
160 let frame_rate = from_virtio_frame_rate(params.rate).unwrap_or(0) as u64;
161 self.period = Duration::from_nanos(
162 (params.period_bytes.to_native() as u64 * 1_000_000_000u64)
163 / frame_rate
164 / params.channels as u64
165 / bytes_per_sample(params.format) as u64,
166 );
167 VIRTIO_SND_S_OK
168 }
169 Err(e) => {
170 error!(
171 "virtio-snd: Error setting parameters for stream {}: {}",
172 self.stream_id, e
173 );
174 vios_error_to_status_code(e)
175 }
176 };
177 (code, desc, StreamState::ParamsSet)
178 }
179 StreamMsg::Prepare(desc) => {
180 let code = match self.vios_client.lock().prepare_stream(self.stream_id) {
181 Ok(()) => VIRTIO_SND_S_OK,
182 Err(e) => {
183 error!(
184 "virtio-snd: Failed to prepare stream {}: {}",
185 self.stream_id, e
186 );
187 vios_error_to_status_code(e)
188 }
189 };
190 (code, desc, StreamState::Prepared)
191 }
192 StreamMsg::Start(desc) => {
193 let code = match self.vios_client.lock().start_stream(self.stream_id) {
194 Ok(()) => VIRTIO_SND_S_OK,
195 Err(e) => {
196 error!(
197 "virtio-snd: Failed to start stream {}: {}",
198 self.stream_id, e
199 );
200 vios_error_to_status_code(e)
201 }
202 };
203 self.start_time = Instant::now();
204 self.next_buffer = Duration::from_millis(0);
205 (code, desc, StreamState::Started)
206 }
207 StreamMsg::Stop(desc) => {
208 let code = match self.vios_client.lock().stop_stream(self.stream_id) {
209 Ok(()) => VIRTIO_SND_S_OK,
210 Err(e) => {
211 error!(
212 "virtio-snd: Failed to stop stream {}: {}",
213 self.stream_id, e
214 );
215 vios_error_to_status_code(e)
216 }
217 };
218 (code, desc, StreamState::Stopped)
219 }
220 StreamMsg::Release(desc) => {
221 let code = match self.vios_client.lock().release_stream(self.stream_id) {
222 Ok(()) => VIRTIO_SND_S_OK,
223 Err(e) => {
224 error!(
225 "virtio-snd: Failed to release stream {}: {}",
226 self.stream_id, e
227 );
228 vios_error_to_status_code(e)
229 }
230 };
231 (code, desc, StreamState::Released)
232 }
233 StreamMsg::Buffer(d) => {
234 // Buffers may arrive while in several states:
235 // - Prepared: Buffer should be queued and played when start cmd arrives
236 // - Started: Buffer should be processed immediately
237 // - Stopped: Buffer should be returned to the guest immediately
238 // Because we may need to wait to process the buffer, we always queue it and
239 // decide what to do with queued buffers after every message.
240 self.buffer_queue.push_back(d);
241 // return here to avoid replying on control queue below
242 return Ok(true);
243 }
244 StreamMsg::Break => {
245 return Ok(false);
246 }
247 };
248 reply_control_op_status(code, desc, &self.control_queue)?;
249 self.current_state = next_state;
250 Ok(true)
251 }
252
maybe_process_queued_buffers(&mut self) -> Result<()>253 fn maybe_process_queued_buffers(&mut self) -> Result<()> {
254 match self.current_state {
255 StreamState::Started => {
256 while let Some(mut desc) = self.buffer_queue.pop_front() {
257 let reader = &mut desc.reader;
258 // Ignore the first buffer, it was already read by the time this thread
259 // receives the descriptor
260 reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
261 let writer = &mut desc.writer;
262 let io_res = if self.capture {
263 let buffer_size =
264 writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>();
265 self.vios_client.lock().request_audio_data(
266 self.stream_id,
267 buffer_size,
268 |vslice| writer.write_from_volatile_slice(*vslice),
269 )
270 } else {
271 self.vios_client.lock().inject_audio_data(
272 self.stream_id,
273 reader.available_bytes(),
274 |vslice| reader.read_to_volatile_slice(vslice),
275 )
276 };
277 let (code, latency) = match io_res {
278 Ok((latency, _)) => (VIRTIO_SND_S_OK, latency),
279 Err(e) => {
280 error!(
281 "virtio-snd: Failed IO operation in stream {}: {}",
282 self.stream_id, e
283 );
284 (VIRTIO_SND_S_IO_ERR, 0)
285 }
286 };
287 if let Err(e) = writer.write_obj(virtio_snd_pcm_status {
288 status: Le32::from(code),
289 latency_bytes: Le32::from(latency),
290 }) {
291 error!(
292 "virtio-snd: Failed to write pcm status from stream {} thread: {}",
293 self.stream_id, e
294 );
295 }
296
297 self.next_buffer += self.period;
298 let elapsed = self.start_time.elapsed();
299 if elapsed < self.next_buffer {
300 // Completing an IO request can be considered an elapsed period
301 // notification by the driver, so we must wait the right amount of time to
302 // release the buffer if the sound server client returned too soon.
303 std::thread::sleep(self.next_buffer - elapsed);
304 }
305 let len = writer.bytes_written() as u32;
306 {
307 let mut io_queue_lock = self.io_queue.lock();
308 io_queue_lock.add_used(desc, len);
309 io_queue_lock.trigger_interrupt();
310 }
311 }
312 }
313 StreamState::Stopped | StreamState::Released => {
314 // For some reason playback buffers can arrive after stop and release (maybe because
315 // buffer-ready notifications arrive over eventfds and those are processed in
316 // random order?). The spec requires the device to not confirm the release of a
317 // stream until all IO buffers have been released, but that's impossible to
318 // guarantee if a buffer arrives after release is requested. Luckily it seems to
319 // work fine if the buffer is released after the release command is completed.
320 while let Some(desc) = self.buffer_queue.pop_front() {
321 reply_pcm_buffer_status(VIRTIO_SND_S_OK, 0, desc, &self.io_queue)?;
322 }
323 }
324 StreamState::Prepared => {} // Do nothing, any buffers will be processed after start
325 _ => {
326 if !self.buffer_queue.is_empty() {
327 warn!("virtio-snd: Buffers received while in unexpected state");
328 }
329 }
330 }
331 Ok(())
332 }
333 }
334
335 impl Drop for Stream {
drop(&mut self)336 fn drop(&mut self) {
337 // Try to stop and release the stream in case it was playing, these operations will fail if
338 // the stream is already released, just ignore that failure
339 let _ = self.vios_client.lock().stop_stream(self.stream_id);
340 let _ = self.vios_client.lock().release_stream(self.stream_id);
341
342 // Also release any pending buffer
343 while let Some(desc) = self.buffer_queue.pop_front() {
344 if let Err(e) = reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, desc, &self.io_queue) {
345 error!(
346 "virtio-snd: Failed to reply buffer on stream {}: {}",
347 self.stream_id, e
348 );
349 }
350 }
351 }
352 }
353
354 /// Basically a proxy to the thread handling a particular stream.
355 pub struct StreamProxy {
356 sender: Sender<Box<StreamMsg>>,
357 thread: Option<thread::JoinHandle<StreamSnapshot>>,
358 }
359
360 impl StreamProxy {
361 /// Access the underlying sender to clone it or send messages
msg_sender(&self) -> &Sender<Box<StreamMsg>>362 pub fn msg_sender(&self) -> &Sender<Box<StreamMsg>> {
363 &self.sender
364 }
365
366 /// Send a message to the stream thread on the other side of this sender
send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()>367 pub fn send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()> {
368 sender
369 .send(Box::new(msg))
370 .map_err(SoundError::StreamThreadSend)
371 }
372
373 /// Convenience function to send a message to this stream's thread
send(&self, msg: StreamMsg) -> Result<()>374 pub fn send(&self, msg: StreamMsg) -> Result<()> {
375 Self::send_msg(&self.sender, msg)
376 }
377
stop_thread(mut self) -> StreamSnapshot378 pub fn stop_thread(mut self) -> StreamSnapshot {
379 self.stop_thread_inner().unwrap()
380 }
381
stop_thread_inner(&mut self) -> Option<StreamSnapshot>382 fn stop_thread_inner(&mut self) -> Option<StreamSnapshot> {
383 if let Some(th) = self.thread.take() {
384 if let Err(e) = self.send(StreamMsg::Break) {
385 error!(
386 "virtio-snd: Failed to send Break msg to stream thread: {}",
387 e
388 );
389 }
390 match th.join() {
391 Ok(state) => Some(state),
392 Err(e) => panic!("virtio-snd: Panic detected on stream thread: {:?}", e),
393 }
394 } else {
395 None
396 }
397 }
398 }
399
400 impl Drop for StreamProxy {
drop(&mut self)401 fn drop(&mut self) {
402 let _ = self.stop_thread_inner();
403 }
404 }
405
406 /// Attempts to set the current thread's priority to a value hight enough to handle audio IO. This
407 /// may fail due to insuficient permissions.
try_set_real_time_priority()408 pub fn try_set_real_time_priority() {
409 const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
410 if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
411 .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
412 {
413 warn!("Failed to set audio stream thread to real time: {}", e);
414 }
415 }
416
417 /// Gets the appropriate virtio-snd error to return to the driver from a VioSError.
vios_error_to_status_code(e: VioSError) -> u32418 pub fn vios_error_to_status_code(e: VioSError) -> u32 {
419 match e {
420 VioSError::ServerIOError(_) => VIRTIO_SND_S_IO_ERR,
421 _ => VIRTIO_SND_S_NOT_SUPP,
422 }
423 }
424
425 /// Encapsulates sending the virtio_snd_hdr struct back to the driver.
reply_control_op_status( code: u32, mut desc: DescriptorChain, queue: &Arc<Mutex<Queue>>, ) -> Result<()>426 pub fn reply_control_op_status(
427 code: u32,
428 mut desc: DescriptorChain,
429 queue: &Arc<Mutex<Queue>>,
430 ) -> Result<()> {
431 let writer = &mut desc.writer;
432 writer
433 .write_obj(virtio_snd_hdr {
434 code: Le32::from(code),
435 })
436 .map_err(SoundError::QueueIO)?;
437 let len = writer.bytes_written() as u32;
438 {
439 let mut queue_lock = queue.lock();
440 queue_lock.add_used(desc, len);
441 queue_lock.trigger_interrupt();
442 }
443 Ok(())
444 }
445
446 /// Encapsulates sending the virtio_snd_pcm_status struct back to the driver.
reply_pcm_buffer_status( status: u32, latency_bytes: u32, mut desc: DescriptorChain, queue: &Arc<Mutex<Queue>>, ) -> Result<()>447 pub fn reply_pcm_buffer_status(
448 status: u32,
449 latency_bytes: u32,
450 mut desc: DescriptorChain,
451 queue: &Arc<Mutex<Queue>>,
452 ) -> Result<()> {
453 let writer = &mut desc.writer;
454 if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
455 writer
456 .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
457 }
458 writer
459 .write_obj(virtio_snd_pcm_status {
460 status: Le32::from(status),
461 latency_bytes: Le32::from(latency_bytes),
462 })
463 .map_err(SoundError::QueueIO)?;
464 let len = writer.bytes_written() as u32;
465 {
466 let mut queue_lock = queue.lock();
467 queue_lock.add_used(desc, len);
468 queue_lock.trigger_interrupt();
469 }
470 Ok(())
471 }
472
bytes_per_sample(format: u8) -> usize473 fn bytes_per_sample(format: u8) -> usize {
474 match format {
475 VIRTIO_SND_PCM_FMT_IMA_ADPCM => 1usize,
476 VIRTIO_SND_PCM_FMT_MU_LAW => 1usize,
477 VIRTIO_SND_PCM_FMT_A_LAW => 1usize,
478 VIRTIO_SND_PCM_FMT_S8 => 1usize,
479 VIRTIO_SND_PCM_FMT_U8 => 1usize,
480 VIRTIO_SND_PCM_FMT_S16 => 2usize,
481 VIRTIO_SND_PCM_FMT_U16 => 2usize,
482 VIRTIO_SND_PCM_FMT_S32 => 4usize,
483 VIRTIO_SND_PCM_FMT_U32 => 4usize,
484 VIRTIO_SND_PCM_FMT_FLOAT => 4usize,
485 VIRTIO_SND_PCM_FMT_FLOAT64 => 8usize,
486 // VIRTIO_SND_PCM_FMT_DSD_U8
487 // VIRTIO_SND_PCM_FMT_DSD_U16
488 // VIRTIO_SND_PCM_FMT_DSD_U32
489 // VIRTIO_SND_PCM_FMT_IEC958_SUBFRAME
490 // VIRTIO_SND_PCM_FMT_S18_3
491 // VIRTIO_SND_PCM_FMT_U18_3
492 // VIRTIO_SND_PCM_FMT_S20_3
493 // VIRTIO_SND_PCM_FMT_U20_3
494 // VIRTIO_SND_PCM_FMT_S24_3
495 // VIRTIO_SND_PCM_FMT_U24_3
496 // VIRTIO_SND_PCM_FMT_S20
497 // VIRTIO_SND_PCM_FMT_U20
498 // VIRTIO_SND_PCM_FMT_S24
499 // VIRTIO_SND_PCM_FMT_U24
500 _ => {
501 // Some of these formats are not consistently stored in a particular size (24bits is
502 // sometimes stored in a 32bit word) while others are of variable size.
503 // The size per sample estimated here is designed to greatly underestimate the time it
504 // takes to play a buffer and depend instead on timings provided by the sound server if
505 // it supports these formats.
506 warn!(
507 "Unknown sample size for format {}, depending on sound server timing instead.",
508 format
509 );
510 1000usize
511 }
512 }
513 }
514