1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::fmt;
6 use std::io;
7 use std::io::Read;
8 use std::io::Write;
9 use std::rc::Rc;
10 use std::time::Duration;
11
12 use async_trait::async_trait;
13 use audio_streams::capture::AsyncCaptureBuffer;
14 use audio_streams::AsyncPlaybackBuffer;
15 use audio_streams::BoxError;
16 use base::debug;
17 use base::error;
18 use cros_async::sync::Condvar;
19 use cros_async::sync::RwLock as AsyncRwLock;
20 use cros_async::EventAsync;
21 use cros_async::Executor;
22 use cros_async::TimerAsync;
23 use futures::channel::mpsc;
24 use futures::channel::oneshot;
25 use futures::pin_mut;
26 use futures::select;
27 use futures::FutureExt;
28 use futures::SinkExt;
29 use futures::StreamExt;
30 use thiserror::Error as ThisError;
31 use zerocopy::AsBytes;
32
33 use super::Error;
34 use super::SndData;
35 use super::WorkerStatus;
36 use crate::virtio::snd::common::*;
37 use crate::virtio::snd::common_backend::stream_info::SetParams;
38 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
39 use crate::virtio::snd::common_backend::DirectionalStream;
40 use crate::virtio::snd::common_backend::PcmResponse;
41 use crate::virtio::snd::constants::*;
42 use crate::virtio::snd::layout::*;
43 use crate::virtio::DescriptorChain;
44 use crate::virtio::Queue;
45 use crate::virtio::Reader;
46 use crate::virtio::Writer;
47
48 /// Trait to wrap system specific helpers for reading from the start point capture buffer.
49 #[async_trait(?Send)]
50 pub trait CaptureBufferReader {
get_next_capture_period( &mut self, ex: &Executor, ) -> Result<AsyncCaptureBuffer, BoxError>51 async fn get_next_capture_period(
52 &mut self,
53 ex: &Executor,
54 ) -> Result<AsyncCaptureBuffer, BoxError>;
55 }
56
57 /// Trait to wrap system specific helpers for writing to endpoint playback buffers.
58 #[async_trait(?Send)]
59 pub trait PlaybackBufferWriter {
new(guest_period_bytes: usize) -> Self where Self: Sized60 fn new(guest_period_bytes: usize) -> Self
61 where
62 Self: Sized;
63
64 /// Returns the period of the endpoint device.
endpoint_period_bytes(&self) -> usize65 fn endpoint_period_bytes(&self) -> usize;
66
67 /// Read audio samples from the tx virtqueue.
copy_to_buffer( &mut self, dst_buf: &mut AsyncPlaybackBuffer<'_>, reader: &mut Reader, ) -> Result<usize, Error>68 fn copy_to_buffer(
69 &mut self,
70 dst_buf: &mut AsyncPlaybackBuffer<'_>,
71 reader: &mut Reader,
72 ) -> Result<usize, Error> {
73 dst_buf.copy_from(reader).map_err(Error::Io)
74 }
75 }
76
77 #[derive(Debug)]
78 enum VirtioSndPcmCmd {
79 SetParams { set_params: SetParams },
80 Prepare,
81 Start,
82 Stop,
83 Release,
84 }
85
86 impl fmt::Display for VirtioSndPcmCmd {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 let cmd_code = match self {
89 VirtioSndPcmCmd::SetParams { set_params: _ } => VIRTIO_SND_R_PCM_SET_PARAMS,
90 VirtioSndPcmCmd::Prepare => VIRTIO_SND_R_PCM_PREPARE,
91 VirtioSndPcmCmd::Start => VIRTIO_SND_R_PCM_START,
92 VirtioSndPcmCmd::Stop => VIRTIO_SND_R_PCM_STOP,
93 VirtioSndPcmCmd::Release => VIRTIO_SND_R_PCM_RELEASE,
94 };
95 f.write_str(get_virtio_snd_r_pcm_cmd_name(cmd_code))
96 }
97 }
98
99 #[derive(ThisError, Debug)]
100 enum VirtioSndPcmCmdError {
101 #[error("SetParams requires additional parameters")]
102 SetParams,
103 #[error("Invalid virtio snd command code")]
104 InvalidCode,
105 }
106
107 impl TryFrom<u32> for VirtioSndPcmCmd {
108 type Error = VirtioSndPcmCmdError;
109
try_from(code: u32) -> Result<Self, Self::Error>110 fn try_from(code: u32) -> Result<Self, Self::Error> {
111 match code {
112 VIRTIO_SND_R_PCM_PREPARE => Ok(VirtioSndPcmCmd::Prepare),
113 VIRTIO_SND_R_PCM_START => Ok(VirtioSndPcmCmd::Start),
114 VIRTIO_SND_R_PCM_STOP => Ok(VirtioSndPcmCmd::Stop),
115 VIRTIO_SND_R_PCM_RELEASE => Ok(VirtioSndPcmCmd::Release),
116 VIRTIO_SND_R_PCM_SET_PARAMS => Err(VirtioSndPcmCmdError::SetParams),
117 _ => Err(VirtioSndPcmCmdError::InvalidCode),
118 }
119 }
120 }
121
122 impl VirtioSndPcmCmd {
with_set_params_and_direction( set_params: virtio_snd_pcm_set_params, dir: u8, ) -> VirtioSndPcmCmd123 fn with_set_params_and_direction(
124 set_params: virtio_snd_pcm_set_params,
125 dir: u8,
126 ) -> VirtioSndPcmCmd {
127 let buffer_bytes: u32 = set_params.buffer_bytes.into();
128 let period_bytes: u32 = set_params.period_bytes.into();
129 VirtioSndPcmCmd::SetParams {
130 set_params: SetParams {
131 channels: set_params.channels,
132 format: from_virtio_sample_format(set_params.format).unwrap(),
133 frame_rate: from_virtio_frame_rate(set_params.rate).unwrap(),
134 buffer_bytes: buffer_bytes as usize,
135 period_bytes: period_bytes as usize,
136 dir,
137 },
138 }
139 }
140 }
141
142 // Returns true if the operation is successful. Returns error if there is
143 // a runtime/internal error
process_pcm_ctrl( ex: &Executor, tx_send: &mpsc::UnboundedSender<PcmResponse>, rx_send: &mpsc::UnboundedSender<PcmResponse>, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, cmd: VirtioSndPcmCmd, writer: &mut Writer, stream_id: usize, card_index: usize, ) -> Result<(), Error>144 async fn process_pcm_ctrl(
145 ex: &Executor,
146 tx_send: &mpsc::UnboundedSender<PcmResponse>,
147 rx_send: &mpsc::UnboundedSender<PcmResponse>,
148 streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
149 cmd: VirtioSndPcmCmd,
150 writer: &mut Writer,
151 stream_id: usize,
152 card_index: usize,
153 ) -> Result<(), Error> {
154 let streams = streams.read_lock().await;
155 let mut stream = match streams.get(stream_id) {
156 Some(stream_info) => stream_info.lock().await,
157 None => {
158 error!(
159 "[Card {}] Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",
160 card_index, stream_id, cmd
161 );
162 return writer
163 .write_obj(VIRTIO_SND_S_BAD_MSG)
164 .map_err(Error::WriteResponse);
165 }
166 };
167
168 debug!("[Card {}] {} for stream id={}", card_index, cmd, stream_id);
169
170 let result = match cmd {
171 VirtioSndPcmCmd::SetParams { set_params } => {
172 let result = stream.set_params(set_params).await;
173 if result.is_ok() {
174 debug!(
175 "[Card {}] VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",
176 card_index, stream_id, *stream
177 );
178 }
179 result
180 }
181 VirtioSndPcmCmd::Prepare => stream.prepare(ex, tx_send, rx_send).await,
182 VirtioSndPcmCmd::Start => stream.start().await,
183 VirtioSndPcmCmd::Stop => stream.stop().await,
184 VirtioSndPcmCmd::Release => stream.release().await,
185 };
186 match result {
187 Ok(_) => writer
188 .write_obj(VIRTIO_SND_S_OK)
189 .map_err(Error::WriteResponse),
190 Err(Error::OperationNotSupported) => {
191 error!(
192 "[Card {}] {} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",
193 card_index, cmd, stream_id
194 );
195
196 writer
197 .write_obj(VIRTIO_SND_S_NOT_SUPP)
198 .map_err(Error::WriteResponse)
199 }
200 Err(e) => {
201 // Runtime/internal error would be more appropriate, but there's
202 // no such error type
203 error!(
204 "[Card {}] {} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",
205 card_index, cmd, stream_id, e
206 );
207 writer
208 .write_obj(VIRTIO_SND_S_IO_ERR)
209 .map_err(Error::WriteResponse)
210 }
211 }
212 }
213
write_data( mut dst_buf: AsyncPlaybackBuffer<'_>, reader: Option<&mut Reader>, buffer_writer: &mut Box<dyn PlaybackBufferWriter>, ) -> Result<u32, Error>214 async fn write_data(
215 mut dst_buf: AsyncPlaybackBuffer<'_>,
216 reader: Option<&mut Reader>,
217 buffer_writer: &mut Box<dyn PlaybackBufferWriter>,
218 ) -> Result<u32, Error> {
219 let transferred = match reader {
220 Some(reader) => buffer_writer.copy_to_buffer(&mut dst_buf, reader)?,
221 None => dst_buf
222 .copy_from(&mut io::repeat(0).take(buffer_writer.endpoint_period_bytes() as u64))
223 .map_err(Error::Io)?,
224 };
225
226 if transferred != buffer_writer.endpoint_period_bytes() {
227 error!(
228 "Bytes written {} != period_bytes {}",
229 transferred,
230 buffer_writer.endpoint_period_bytes()
231 );
232 Err(Error::InvalidBufferSize)
233 } else {
234 dst_buf.commit().await;
235 Ok(dst_buf.latency_bytes())
236 }
237 }
238
read_data<'a>( mut src_buf: AsyncCaptureBuffer<'a>, writer: Option<&mut Writer>, period_bytes: usize, ) -> Result<u32, Error>239 async fn read_data<'a>(
240 mut src_buf: AsyncCaptureBuffer<'a>,
241 writer: Option<&mut Writer>,
242 period_bytes: usize,
243 ) -> Result<u32, Error> {
244 let transferred = match writer {
245 Some(writer) => src_buf.copy_to(writer),
246 None => src_buf.copy_to(&mut io::sink()),
247 }
248 .map_err(Error::Io)?;
249 if transferred != period_bytes {
250 error!(
251 "Bytes written {} != period_bytes {}",
252 transferred, period_bytes
253 );
254 Err(Error::InvalidBufferSize)
255 } else {
256 src_buf.commit().await;
257 Ok(src_buf.latency_bytes())
258 }
259 }
260
261 impl From<Result<u32, Error>> for virtio_snd_pcm_status {
from(res: Result<u32, Error>) -> Self262 fn from(res: Result<u32, Error>) -> Self {
263 match res {
264 Ok(latency_bytes) => virtio_snd_pcm_status::new(StatusCode::OK, latency_bytes),
265 Err(e) => {
266 error!("PCM I/O message failed: {}", e);
267 virtio_snd_pcm_status::new(StatusCode::IoErr, 0)
268 }
269 }
270 }
271 }
272
273 // Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.
drain_desc_receiver( desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>274 async fn drain_desc_receiver(
275 desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
276 sender: &mut mpsc::UnboundedSender<PcmResponse>,
277 ) -> Result<(), Error> {
278 let mut o_desc_chain = desc_receiver.next().await;
279 while let Some(desc_chain) = o_desc_chain {
280 // From the virtio-snd spec:
281 // The device MUST complete all pending I/O messages for the specified stream ID.
282 let status = virtio_snd_pcm_status::new(StatusCode::OK, 0);
283 // Fetch next DescriptorChain to see if the current one is the last one.
284 o_desc_chain = desc_receiver.next().await;
285 let (done, future) = if o_desc_chain.is_none() {
286 let (done, future) = oneshot::channel();
287 (Some(done), Some(future))
288 } else {
289 (None, None)
290 };
291 sender
292 .send(PcmResponse {
293 desc_chain,
294 status,
295 done,
296 })
297 .await
298 .map_err(Error::MpscSend)?;
299
300 if let Some(f) = future {
301 // From the virtio-snd spec:
302 // The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)
303 // while there are pending I/O messages for the specified stream ID.
304 f.await.map_err(Error::DoneNotTriggered)?;
305 };
306 }
307 Ok(())
308 }
309
310 /// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx
311 /// queue, and forward them to CRAS. One pcm worker per stream.
312 ///
313 /// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before
314 /// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.
start_pcm_worker( ex: Executor, dstream: DirectionalStream, mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: Rc<AsyncRwLock<WorkerStatus>>, mut sender: mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, card_index: usize, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>315 pub async fn start_pcm_worker(
316 ex: Executor,
317 dstream: DirectionalStream,
318 mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,
319 status_mutex: Rc<AsyncRwLock<WorkerStatus>>,
320 mut sender: mpsc::UnboundedSender<PcmResponse>,
321 period_dur: Duration,
322 card_index: usize,
323 release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
324 ) -> Result<(), Error> {
325 let res = pcm_worker_loop(
326 ex,
327 dstream,
328 &mut desc_receiver,
329 &status_mutex,
330 &mut sender,
331 period_dur,
332 card_index,
333 release_signal,
334 )
335 .await;
336 *status_mutex.lock().await = WorkerStatus::Quit;
337 if res.is_err() {
338 error!(
339 "[Card {}] pcm_worker error: {:#?}. Draining desc_receiver",
340 card_index,
341 res.as_ref().err()
342 );
343 // On error, guaranteed that desc_receiver has not been drained, so drain it here.
344 // Note that drain blocks until the stream is release.
345 drain_desc_receiver(&mut desc_receiver, &mut sender).await?;
346 }
347 res
348 }
349
pcm_worker_loop( ex: Executor, dstream: DirectionalStream, desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: &Rc<AsyncRwLock<WorkerStatus>>, sender: &mut mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, card_index: usize, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>350 async fn pcm_worker_loop(
351 ex: Executor,
352 dstream: DirectionalStream,
353 desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
354 status_mutex: &Rc<AsyncRwLock<WorkerStatus>>,
355 sender: &mut mpsc::UnboundedSender<PcmResponse>,
356 period_dur: Duration,
357 card_index: usize,
358 release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
359 ) -> Result<(), Error> {
360 let on_release = async {
361 await_reset_signal(Some(&*release_signal)).await;
362 // After receiving release signal, wait for up to 2 periods,
363 // giving it a chance to respond to the last buffer.
364 if let Err(e) = TimerAsync::sleep(&ex, period_dur * 2).await {
365 error!(
366 "[Card {}] Error on sleep after receiving reset signal: {}",
367 card_index, e
368 )
369 }
370 }
371 .fuse();
372 pin_mut!(on_release);
373
374 match dstream {
375 DirectionalStream::Output(mut sys_direction_output) => loop {
376 #[cfg(windows)]
377 let (mut stream, mut buffer_writer_lock) = (
378 sys_direction_output
379 .async_playback_buffer_stream
380 .lock()
381 .await,
382 sys_direction_output.buffer_writer.lock().await,
383 );
384 #[cfg(windows)]
385 let buffer_writer = &mut buffer_writer_lock;
386 #[cfg(any(target_os = "android", target_os = "linux"))]
387 let (stream, buffer_writer) = (
388 &mut sys_direction_output.async_playback_buffer_stream,
389 &mut sys_direction_output.buffer_writer,
390 );
391
392 let next_buf = stream.next_playback_buffer(&ex).fuse();
393 pin_mut!(next_buf);
394
395 let dst_buf = select! {
396 _ = on_release => {
397 drain_desc_receiver(desc_receiver, sender).await?;
398 break Ok(());
399 },
400 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
401 };
402 let worker_status = status_mutex.lock().await;
403 match *worker_status {
404 WorkerStatus::Quit => {
405 drain_desc_receiver(desc_receiver, sender).await?;
406 if let Err(e) = write_data(dst_buf, None, buffer_writer).await {
407 error!(
408 "[Card {}] Error on write_data after worker quit: {}",
409 card_index, e
410 )
411 }
412 break Ok(());
413 }
414 WorkerStatus::Pause => {
415 write_data(dst_buf, None, buffer_writer).await?;
416 }
417 WorkerStatus::Running => match desc_receiver.try_next() {
418 Err(e) => {
419 error!(
420 "[Card {}] Underrun. No new DescriptorChain while running: {}",
421 card_index, e
422 );
423 write_data(dst_buf, None, buffer_writer).await?;
424 }
425 Ok(None) => {
426 error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
427 write_data(dst_buf, None, buffer_writer).await?;
428 return Err(Error::InvalidPCMWorkerState);
429 }
430 Ok(Some(mut desc_chain)) => {
431 // stream_id was already read in handle_pcm_queue
432 let status =
433 write_data(dst_buf, Some(&mut desc_chain.reader), buffer_writer)
434 .await
435 .into();
436 sender
437 .send(PcmResponse {
438 desc_chain,
439 status,
440 done: None,
441 })
442 .await
443 .map_err(Error::MpscSend)?;
444 }
445 },
446 }
447 },
448 DirectionalStream::Input(period_bytes, mut buffer_reader) => loop {
449 let next_buf = buffer_reader.get_next_capture_period(&ex).fuse();
450 pin_mut!(next_buf);
451
452 let src_buf = select! {
453 _ = on_release => {
454 drain_desc_receiver(desc_receiver, sender).await?;
455 break Ok(());
456 },
457 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
458 };
459
460 let worker_status = status_mutex.lock().await;
461 match *worker_status {
462 WorkerStatus::Quit => {
463 drain_desc_receiver(desc_receiver, sender).await?;
464 if let Err(e) = read_data(src_buf, None, period_bytes).await {
465 error!(
466 "[Card {}] Error on read_data after worker quit: {}",
467 card_index, e
468 )
469 }
470 break Ok(());
471 }
472 WorkerStatus::Pause => {
473 read_data(src_buf, None, period_bytes).await?;
474 }
475 WorkerStatus::Running => match desc_receiver.try_next() {
476 Err(e) => {
477 error!(
478 "[Card {}] Overrun. No new DescriptorChain while running: {}",
479 card_index, e
480 );
481 read_data(src_buf, None, period_bytes).await?;
482 }
483 Ok(None) => {
484 error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
485 read_data(src_buf, None, period_bytes).await?;
486 return Err(Error::InvalidPCMWorkerState);
487 }
488 Ok(Some(mut desc_chain)) => {
489 let status = read_data(src_buf, Some(&mut desc_chain.writer), period_bytes)
490 .await
491 .into();
492 sender
493 .send(PcmResponse {
494 desc_chain,
495 status,
496 done: None,
497 })
498 .await
499 .map_err(Error::MpscSend)?;
500 }
501 },
502 }
503 },
504 }
505 }
506
507 // Defer pcm message response to the pcm response worker
defer_pcm_response_to_worker( desc_chain: DescriptorChain, status: virtio_snd_pcm_status, response_sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>508 async fn defer_pcm_response_to_worker(
509 desc_chain: DescriptorChain,
510 status: virtio_snd_pcm_status,
511 response_sender: &mut mpsc::UnboundedSender<PcmResponse>,
512 ) -> Result<(), Error> {
513 response_sender
514 .send(PcmResponse {
515 desc_chain,
516 status,
517 done: None,
518 })
519 .await
520 .map_err(Error::MpscSend)
521 }
522
send_pcm_response( mut desc_chain: DescriptorChain, queue: &mut Queue, status: virtio_snd_pcm_status, ) -> Result<(), Error>523 fn send_pcm_response(
524 mut desc_chain: DescriptorChain,
525 queue: &mut Queue,
526 status: virtio_snd_pcm_status,
527 ) -> Result<(), Error> {
528 let writer = &mut desc_chain.writer;
529
530 // For rx queue only. Fast forward the unused audio data buffer.
531 if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
532 writer
533 .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
534 }
535 writer.write_obj(status).map_err(Error::WriteResponse)?;
536 let len = writer.bytes_written() as u32;
537 queue.add_used(desc_chain, len);
538 queue.trigger_interrupt();
539 Ok(())
540 }
541
542 // Await until reset_signal has been released
await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>)543 async fn await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>) {
544 match reset_signal_option {
545 Some((lock, cvar)) => {
546 let mut reset = lock.lock().await;
547 while !*reset {
548 reset = cvar.wait(reset).await;
549 }
550 }
551 None => futures::future::pending().await,
552 };
553 }
554
send_pcm_response_worker( queue: Rc<AsyncRwLock<Queue>>, recv: &mut mpsc::UnboundedReceiver<PcmResponse>, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>555 pub async fn send_pcm_response_worker(
556 queue: Rc<AsyncRwLock<Queue>>,
557 recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
558 reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
559 ) -> Result<(), Error> {
560 let on_reset = await_reset_signal(reset_signal).fuse();
561 pin_mut!(on_reset);
562
563 loop {
564 let next_async = recv.next().fuse();
565 pin_mut!(next_async);
566
567 let res = select! {
568 _ = on_reset => break,
569 res = next_async => res,
570 };
571
572 if let Some(r) = res {
573 send_pcm_response(r.desc_chain, &mut *queue.lock().await, r.status)?;
574
575 // Resume pcm_worker
576 if let Some(done) = r.done {
577 done.send(()).map_err(Error::OneshotSend)?;
578 }
579 } else {
580 debug!("PcmResponse channel is closed.");
581 break;
582 }
583 }
584 Ok(())
585 }
586
587 /// Handle messages from the tx or the rx queue. One invocation is needed for
588 /// each queue.
handle_pcm_queue( streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, mut response_sender: mpsc::UnboundedSender<PcmResponse>, queue: Rc<AsyncRwLock<Queue>>, queue_event: &EventAsync, card_index: usize, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>589 pub async fn handle_pcm_queue(
590 streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
591 mut response_sender: mpsc::UnboundedSender<PcmResponse>,
592 queue: Rc<AsyncRwLock<Queue>>,
593 queue_event: &EventAsync,
594 card_index: usize,
595 reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
596 ) -> Result<(), Error> {
597 let on_reset = await_reset_signal(reset_signal).fuse();
598 pin_mut!(on_reset);
599
600 loop {
601 // Manual queue.next_async() to avoid holding the mutex
602 let next_async = async {
603 loop {
604 // Check if there are more descriptors available.
605 if let Some(chain) = queue.lock().await.pop() {
606 return Ok(chain);
607 }
608 queue_event.next_val().await?;
609 }
610 }
611 .fuse();
612 pin_mut!(next_async);
613
614 let mut desc_chain = select! {
615 _ = on_reset => break,
616 res = next_async => res.map_err(Error::Async)?,
617 };
618
619 let pcm_xfer: virtio_snd_pcm_xfer =
620 desc_chain.reader.read_obj().map_err(Error::ReadMessage)?;
621 let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;
622
623 let streams = streams.read_lock().await;
624 let stream_info = match streams.get(stream_id) {
625 Some(stream_info) => stream_info.read_lock().await,
626 None => {
627 error!(
628 "[Card {}] stream_id ({}) >= num_streams ({})",
629 card_index,
630 stream_id,
631 streams.len()
632 );
633 defer_pcm_response_to_worker(
634 desc_chain,
635 virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
636 &mut response_sender,
637 )
638 .await?;
639 continue;
640 }
641 };
642
643 match stream_info.sender.as_ref() {
644 Some(mut s) => {
645 s.send(desc_chain).await.map_err(Error::MpscSend)?;
646 if *stream_info.status_mutex.lock().await == WorkerStatus::Quit {
647 // If sender channel is still intact but worker status is quit,
648 // the worker quitted unexpectedly. Return error to request a reset.
649 return Err(Error::PCMWorkerQuittedUnexpectedly);
650 }
651 }
652 None => {
653 if !stream_info.just_reset {
654 error!(
655 "[Card {}] stream {} is not ready. state: {}",
656 card_index,
657 stream_id,
658 get_virtio_snd_r_pcm_cmd_name(stream_info.state)
659 );
660 }
661 defer_pcm_response_to_worker(
662 desc_chain,
663 virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
664 &mut response_sender,
665 )
666 .await?;
667 }
668 };
669 }
670 Ok(())
671 }
672
673 /// Handle all the control messages from the ctrl queue.
handle_ctrl_queue( ex: &Executor, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, snd_data: &SndData, queue: Rc<AsyncRwLock<Queue>>, queue_event: &mut EventAsync, tx_send: mpsc::UnboundedSender<PcmResponse>, rx_send: mpsc::UnboundedSender<PcmResponse>, card_index: usize, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>674 pub async fn handle_ctrl_queue(
675 ex: &Executor,
676 streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
677 snd_data: &SndData,
678 queue: Rc<AsyncRwLock<Queue>>,
679 queue_event: &mut EventAsync,
680 tx_send: mpsc::UnboundedSender<PcmResponse>,
681 rx_send: mpsc::UnboundedSender<PcmResponse>,
682 card_index: usize,
683 reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
684 ) -> Result<(), Error> {
685 let on_reset = await_reset_signal(reset_signal).fuse();
686 pin_mut!(on_reset);
687
688 let mut queue = queue.lock().await;
689 loop {
690 let mut desc_chain = {
691 let next_async = queue.next_async(queue_event).fuse();
692 pin_mut!(next_async);
693
694 select! {
695 _ = on_reset => break,
696 res = next_async => res.map_err(Error::Async)?,
697 }
698 };
699
700 let reader = &mut desc_chain.reader;
701 let writer = &mut desc_chain.writer;
702 // Don't advance the reader
703 let code = reader
704 .peek_obj::<virtio_snd_hdr>()
705 .map_err(Error::ReadMessage)?
706 .code
707 .into();
708
709 let handle_ctrl_msg = async {
710 return match code {
711 VIRTIO_SND_R_JACK_INFO => {
712 let query_info: virtio_snd_query_info =
713 reader.read_obj().map_err(Error::ReadMessage)?;
714 let start_id: usize = u32::from(query_info.start_id) as usize;
715 let count: usize = u32::from(query_info.count) as usize;
716 if start_id + count > snd_data.jack_info.len() {
717 error!(
718 "[Card {}] start_id({}) + count({}) must be smaller than \
719 the number of jacks ({})",
720 card_index,
721 start_id,
722 count,
723 snd_data.jack_info.len()
724 );
725 return writer
726 .write_obj(VIRTIO_SND_S_BAD_MSG)
727 .map_err(Error::WriteResponse);
728 }
729 // The response consists of the virtio_snd_hdr structure (contains the request
730 // status code), followed by the device-writable information structures of the
731 // item. Each information structure begins with the following common header
732 writer
733 .write_obj(VIRTIO_SND_S_OK)
734 .map_err(Error::WriteResponse)?;
735 for i in start_id..(start_id + count) {
736 writer
737 .write_all(snd_data.jack_info[i].as_bytes())
738 .map_err(Error::WriteResponse)?;
739 }
740 Ok(())
741 }
742 VIRTIO_SND_R_PCM_INFO => {
743 let query_info: virtio_snd_query_info =
744 reader.read_obj().map_err(Error::ReadMessage)?;
745 let start_id: usize = u32::from(query_info.start_id) as usize;
746 let count: usize = u32::from(query_info.count) as usize;
747 if start_id + count > snd_data.pcm_info.len() {
748 error!(
749 "[Card {}] start_id({}) + count({}) must be smaller than \
750 the number of streams ({})",
751 card_index,
752 start_id,
753 count,
754 snd_data.pcm_info.len()
755 );
756 return writer
757 .write_obj(VIRTIO_SND_S_BAD_MSG)
758 .map_err(Error::WriteResponse);
759 }
760 // The response consists of the virtio_snd_hdr structure (contains the request
761 // status code), followed by the device-writable information structures of the
762 // item. Each information structure begins with the following common header
763 writer
764 .write_obj(VIRTIO_SND_S_OK)
765 .map_err(Error::WriteResponse)?;
766 for i in start_id..(start_id + count) {
767 writer
768 .write_all(snd_data.pcm_info[i].as_bytes())
769 .map_err(Error::WriteResponse)?;
770 }
771 Ok(())
772 }
773 VIRTIO_SND_R_CHMAP_INFO => {
774 let query_info: virtio_snd_query_info =
775 reader.read_obj().map_err(Error::ReadMessage)?;
776 let start_id: usize = u32::from(query_info.start_id) as usize;
777 let count: usize = u32::from(query_info.count) as usize;
778 if start_id + count > snd_data.chmap_info.len() {
779 error!(
780 "[Card {}] start_id({}) + count({}) must be smaller than \
781 the number of chmaps ({})",
782 card_index,
783 start_id,
784 count,
785 snd_data.chmap_info.len()
786 );
787 return writer
788 .write_obj(VIRTIO_SND_S_BAD_MSG)
789 .map_err(Error::WriteResponse);
790 }
791 // The response consists of the virtio_snd_hdr structure (contains the request
792 // status code), followed by the device-writable information structures of the
793 // item. Each information structure begins with the following common header
794 writer
795 .write_obj(VIRTIO_SND_S_OK)
796 .map_err(Error::WriteResponse)?;
797 for i in start_id..(start_id + count) {
798 writer
799 .write_all(snd_data.chmap_info[i].as_bytes())
800 .map_err(Error::WriteResponse)?;
801 }
802 Ok(())
803 }
804 VIRTIO_SND_R_JACK_REMAP => {
805 unreachable!("remap is unsupported");
806 }
807 VIRTIO_SND_R_PCM_SET_PARAMS => {
808 // Raise VIRTIO_SND_S_BAD_MSG or IO error?
809 let set_params: virtio_snd_pcm_set_params =
810 reader.read_obj().map_err(Error::ReadMessage)?;
811 let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;
812 let buffer_bytes: u32 = set_params.buffer_bytes.into();
813 let period_bytes: u32 = set_params.period_bytes.into();
814
815 let dir = match snd_data.pcm_info.get(stream_id) {
816 Some(pcm_info) => {
817 if set_params.channels < pcm_info.channels_min
818 || set_params.channels > pcm_info.channels_max
819 {
820 error!(
821 "[Card {}] Number of channels ({}) must be between {} and {}",
822 card_index,
823 set_params.channels,
824 pcm_info.channels_min,
825 pcm_info.channels_max
826 );
827 return writer
828 .write_obj(VIRTIO_SND_S_NOT_SUPP)
829 .map_err(Error::WriteResponse);
830 }
831 if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {
832 error!(
833 "[Card {}] PCM format {} is not supported.",
834 card_index, set_params.format
835 );
836 return writer
837 .write_obj(VIRTIO_SND_S_NOT_SUPP)
838 .map_err(Error::WriteResponse);
839 }
840 if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {
841 error!(
842 "[Card {}] PCM frame rate {} is not supported.",
843 card_index, set_params.rate
844 );
845 return writer
846 .write_obj(VIRTIO_SND_S_NOT_SUPP)
847 .map_err(Error::WriteResponse);
848 }
849
850 pcm_info.direction
851 }
852 None => {
853 error!(
854 "[Card {}] stream_id {} < streams {}",
855 card_index,
856 stream_id,
857 snd_data.pcm_info.len()
858 );
859 return writer
860 .write_obj(VIRTIO_SND_S_BAD_MSG)
861 .map_err(Error::WriteResponse);
862 }
863 };
864
865 if set_params.features != 0 {
866 error!("[Card {}] No feature is supported", card_index);
867 return writer
868 .write_obj(VIRTIO_SND_S_NOT_SUPP)
869 .map_err(Error::WriteResponse);
870 }
871
872 if buffer_bytes % period_bytes != 0 {
873 error!(
874 "[Card {}] buffer_bytes({}) must be dividable by period_bytes({})",
875 card_index, buffer_bytes, period_bytes
876 );
877 return writer
878 .write_obj(VIRTIO_SND_S_BAD_MSG)
879 .map_err(Error::WriteResponse);
880 }
881
882 process_pcm_ctrl(
883 ex,
884 &tx_send,
885 &rx_send,
886 streams,
887 VirtioSndPcmCmd::with_set_params_and_direction(set_params, dir),
888 writer,
889 stream_id,
890 card_index,
891 )
892 .await
893 }
894 VIRTIO_SND_R_PCM_PREPARE
895 | VIRTIO_SND_R_PCM_START
896 | VIRTIO_SND_R_PCM_STOP
897 | VIRTIO_SND_R_PCM_RELEASE => {
898 let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;
899 let stream_id: usize = u32::from(hdr.stream_id) as usize;
900 let cmd = match VirtioSndPcmCmd::try_from(code) {
901 Ok(cmd) => cmd,
902 Err(err) => {
903 error!(
904 "[Card {}] Error converting code to command: {}",
905 card_index, err
906 );
907 return writer
908 .write_obj(VIRTIO_SND_S_BAD_MSG)
909 .map_err(Error::WriteResponse);
910 }
911 };
912 process_pcm_ctrl(
913 ex, &tx_send, &rx_send, streams, cmd, writer, stream_id, card_index,
914 )
915 .await
916 .and(Ok(()))?;
917 Ok(())
918 }
919 c => {
920 error!("[Card {}] Unrecognized code: {}", card_index, c);
921 return writer
922 .write_obj(VIRTIO_SND_S_BAD_MSG)
923 .map_err(Error::WriteResponse);
924 }
925 };
926 };
927
928 handle_ctrl_msg.await?;
929 let len = writer.bytes_written() as u32;
930 queue.add_used(desc_chain, len);
931 queue.trigger_interrupt();
932 }
933 Ok(())
934 }
935
936 /// Send events to the audio driver.
handle_event_queue( mut queue: Queue, mut queue_event: EventAsync, ) -> Result<(), Error>937 pub async fn handle_event_queue(
938 mut queue: Queue,
939 mut queue_event: EventAsync,
940 ) -> Result<(), Error> {
941 loop {
942 let desc_chain = queue
943 .next_async(&mut queue_event)
944 .await
945 .map_err(Error::Async)?;
946
947 // TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
948 queue.add_used(desc_chain, 0);
949 queue.trigger_interrupt();
950 }
951 }
952