xref: /aosp_15_r20/external/crosvm/devices/src/virtio/snd/common_backend/async_funcs.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::fmt;
6 use std::io;
7 use std::io::Read;
8 use std::io::Write;
9 use std::rc::Rc;
10 use std::time::Duration;
11 
12 use async_trait::async_trait;
13 use audio_streams::capture::AsyncCaptureBuffer;
14 use audio_streams::AsyncPlaybackBuffer;
15 use audio_streams::BoxError;
16 use base::debug;
17 use base::error;
18 use cros_async::sync::Condvar;
19 use cros_async::sync::RwLock as AsyncRwLock;
20 use cros_async::EventAsync;
21 use cros_async::Executor;
22 use cros_async::TimerAsync;
23 use futures::channel::mpsc;
24 use futures::channel::oneshot;
25 use futures::pin_mut;
26 use futures::select;
27 use futures::FutureExt;
28 use futures::SinkExt;
29 use futures::StreamExt;
30 use thiserror::Error as ThisError;
31 use zerocopy::AsBytes;
32 
33 use super::Error;
34 use super::SndData;
35 use super::WorkerStatus;
36 use crate::virtio::snd::common::*;
37 use crate::virtio::snd::common_backend::stream_info::SetParams;
38 use crate::virtio::snd::common_backend::stream_info::StreamInfo;
39 use crate::virtio::snd::common_backend::DirectionalStream;
40 use crate::virtio::snd::common_backend::PcmResponse;
41 use crate::virtio::snd::constants::*;
42 use crate::virtio::snd::layout::*;
43 use crate::virtio::DescriptorChain;
44 use crate::virtio::Queue;
45 use crate::virtio::Reader;
46 use crate::virtio::Writer;
47 
48 /// Trait to wrap system specific helpers for reading from the start point capture buffer.
49 #[async_trait(?Send)]
50 pub trait CaptureBufferReader {
get_next_capture_period( &mut self, ex: &Executor, ) -> Result<AsyncCaptureBuffer, BoxError>51     async fn get_next_capture_period(
52         &mut self,
53         ex: &Executor,
54     ) -> Result<AsyncCaptureBuffer, BoxError>;
55 }
56 
57 /// Trait to wrap system specific helpers for writing to endpoint playback buffers.
58 #[async_trait(?Send)]
59 pub trait PlaybackBufferWriter {
new(guest_period_bytes: usize) -> Self where Self: Sized60     fn new(guest_period_bytes: usize) -> Self
61     where
62         Self: Sized;
63 
64     /// Returns the period of the endpoint device.
endpoint_period_bytes(&self) -> usize65     fn endpoint_period_bytes(&self) -> usize;
66 
67     /// Read audio samples from the tx virtqueue.
copy_to_buffer( &mut self, dst_buf: &mut AsyncPlaybackBuffer<'_>, reader: &mut Reader, ) -> Result<usize, Error>68     fn copy_to_buffer(
69         &mut self,
70         dst_buf: &mut AsyncPlaybackBuffer<'_>,
71         reader: &mut Reader,
72     ) -> Result<usize, Error> {
73         dst_buf.copy_from(reader).map_err(Error::Io)
74     }
75 }
76 
77 #[derive(Debug)]
78 enum VirtioSndPcmCmd {
79     SetParams { set_params: SetParams },
80     Prepare,
81     Start,
82     Stop,
83     Release,
84 }
85 
86 impl fmt::Display for VirtioSndPcmCmd {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result87     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88         let cmd_code = match self {
89             VirtioSndPcmCmd::SetParams { set_params: _ } => VIRTIO_SND_R_PCM_SET_PARAMS,
90             VirtioSndPcmCmd::Prepare => VIRTIO_SND_R_PCM_PREPARE,
91             VirtioSndPcmCmd::Start => VIRTIO_SND_R_PCM_START,
92             VirtioSndPcmCmd::Stop => VIRTIO_SND_R_PCM_STOP,
93             VirtioSndPcmCmd::Release => VIRTIO_SND_R_PCM_RELEASE,
94         };
95         f.write_str(get_virtio_snd_r_pcm_cmd_name(cmd_code))
96     }
97 }
98 
99 #[derive(ThisError, Debug)]
100 enum VirtioSndPcmCmdError {
101     #[error("SetParams requires additional parameters")]
102     SetParams,
103     #[error("Invalid virtio snd command code")]
104     InvalidCode,
105 }
106 
107 impl TryFrom<u32> for VirtioSndPcmCmd {
108     type Error = VirtioSndPcmCmdError;
109 
try_from(code: u32) -> Result<Self, Self::Error>110     fn try_from(code: u32) -> Result<Self, Self::Error> {
111         match code {
112             VIRTIO_SND_R_PCM_PREPARE => Ok(VirtioSndPcmCmd::Prepare),
113             VIRTIO_SND_R_PCM_START => Ok(VirtioSndPcmCmd::Start),
114             VIRTIO_SND_R_PCM_STOP => Ok(VirtioSndPcmCmd::Stop),
115             VIRTIO_SND_R_PCM_RELEASE => Ok(VirtioSndPcmCmd::Release),
116             VIRTIO_SND_R_PCM_SET_PARAMS => Err(VirtioSndPcmCmdError::SetParams),
117             _ => Err(VirtioSndPcmCmdError::InvalidCode),
118         }
119     }
120 }
121 
122 impl VirtioSndPcmCmd {
with_set_params_and_direction( set_params: virtio_snd_pcm_set_params, dir: u8, ) -> VirtioSndPcmCmd123     fn with_set_params_and_direction(
124         set_params: virtio_snd_pcm_set_params,
125         dir: u8,
126     ) -> VirtioSndPcmCmd {
127         let buffer_bytes: u32 = set_params.buffer_bytes.into();
128         let period_bytes: u32 = set_params.period_bytes.into();
129         VirtioSndPcmCmd::SetParams {
130             set_params: SetParams {
131                 channels: set_params.channels,
132                 format: from_virtio_sample_format(set_params.format).unwrap(),
133                 frame_rate: from_virtio_frame_rate(set_params.rate).unwrap(),
134                 buffer_bytes: buffer_bytes as usize,
135                 period_bytes: period_bytes as usize,
136                 dir,
137             },
138         }
139     }
140 }
141 
142 // Returns true if the operation is successful. Returns error if there is
143 // a runtime/internal error
process_pcm_ctrl( ex: &Executor, tx_send: &mpsc::UnboundedSender<PcmResponse>, rx_send: &mpsc::UnboundedSender<PcmResponse>, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, cmd: VirtioSndPcmCmd, writer: &mut Writer, stream_id: usize, card_index: usize, ) -> Result<(), Error>144 async fn process_pcm_ctrl(
145     ex: &Executor,
146     tx_send: &mpsc::UnboundedSender<PcmResponse>,
147     rx_send: &mpsc::UnboundedSender<PcmResponse>,
148     streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
149     cmd: VirtioSndPcmCmd,
150     writer: &mut Writer,
151     stream_id: usize,
152     card_index: usize,
153 ) -> Result<(), Error> {
154     let streams = streams.read_lock().await;
155     let mut stream = match streams.get(stream_id) {
156         Some(stream_info) => stream_info.lock().await,
157         None => {
158             error!(
159                 "[Card {}] Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",
160                 card_index, stream_id, cmd
161             );
162             return writer
163                 .write_obj(VIRTIO_SND_S_BAD_MSG)
164                 .map_err(Error::WriteResponse);
165         }
166     };
167 
168     debug!("[Card {}] {} for stream id={}", card_index, cmd, stream_id);
169 
170     let result = match cmd {
171         VirtioSndPcmCmd::SetParams { set_params } => {
172             let result = stream.set_params(set_params).await;
173             if result.is_ok() {
174                 debug!(
175                     "[Card {}] VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",
176                     card_index, stream_id, *stream
177                 );
178             }
179             result
180         }
181         VirtioSndPcmCmd::Prepare => stream.prepare(ex, tx_send, rx_send).await,
182         VirtioSndPcmCmd::Start => stream.start().await,
183         VirtioSndPcmCmd::Stop => stream.stop().await,
184         VirtioSndPcmCmd::Release => stream.release().await,
185     };
186     match result {
187         Ok(_) => writer
188             .write_obj(VIRTIO_SND_S_OK)
189             .map_err(Error::WriteResponse),
190         Err(Error::OperationNotSupported) => {
191             error!(
192                 "[Card {}] {} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",
193                 card_index, cmd, stream_id
194             );
195 
196             writer
197                 .write_obj(VIRTIO_SND_S_NOT_SUPP)
198                 .map_err(Error::WriteResponse)
199         }
200         Err(e) => {
201             // Runtime/internal error would be more appropriate, but there's
202             // no such error type
203             error!(
204                 "[Card {}] {} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",
205                 card_index, cmd, stream_id, e
206             );
207             writer
208                 .write_obj(VIRTIO_SND_S_IO_ERR)
209                 .map_err(Error::WriteResponse)
210         }
211     }
212 }
213 
write_data( mut dst_buf: AsyncPlaybackBuffer<'_>, reader: Option<&mut Reader>, buffer_writer: &mut Box<dyn PlaybackBufferWriter>, ) -> Result<u32, Error>214 async fn write_data(
215     mut dst_buf: AsyncPlaybackBuffer<'_>,
216     reader: Option<&mut Reader>,
217     buffer_writer: &mut Box<dyn PlaybackBufferWriter>,
218 ) -> Result<u32, Error> {
219     let transferred = match reader {
220         Some(reader) => buffer_writer.copy_to_buffer(&mut dst_buf, reader)?,
221         None => dst_buf
222             .copy_from(&mut io::repeat(0).take(buffer_writer.endpoint_period_bytes() as u64))
223             .map_err(Error::Io)?,
224     };
225 
226     if transferred != buffer_writer.endpoint_period_bytes() {
227         error!(
228             "Bytes written {} != period_bytes {}",
229             transferred,
230             buffer_writer.endpoint_period_bytes()
231         );
232         Err(Error::InvalidBufferSize)
233     } else {
234         dst_buf.commit().await;
235         Ok(dst_buf.latency_bytes())
236     }
237 }
238 
read_data<'a>( mut src_buf: AsyncCaptureBuffer<'a>, writer: Option<&mut Writer>, period_bytes: usize, ) -> Result<u32, Error>239 async fn read_data<'a>(
240     mut src_buf: AsyncCaptureBuffer<'a>,
241     writer: Option<&mut Writer>,
242     period_bytes: usize,
243 ) -> Result<u32, Error> {
244     let transferred = match writer {
245         Some(writer) => src_buf.copy_to(writer),
246         None => src_buf.copy_to(&mut io::sink()),
247     }
248     .map_err(Error::Io)?;
249     if transferred != period_bytes {
250         error!(
251             "Bytes written {} != period_bytes {}",
252             transferred, period_bytes
253         );
254         Err(Error::InvalidBufferSize)
255     } else {
256         src_buf.commit().await;
257         Ok(src_buf.latency_bytes())
258     }
259 }
260 
261 impl From<Result<u32, Error>> for virtio_snd_pcm_status {
from(res: Result<u32, Error>) -> Self262     fn from(res: Result<u32, Error>) -> Self {
263         match res {
264             Ok(latency_bytes) => virtio_snd_pcm_status::new(StatusCode::OK, latency_bytes),
265             Err(e) => {
266                 error!("PCM I/O message failed: {}", e);
267                 virtio_snd_pcm_status::new(StatusCode::IoErr, 0)
268             }
269         }
270     }
271 }
272 
273 // Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.
drain_desc_receiver( desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>274 async fn drain_desc_receiver(
275     desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
276     sender: &mut mpsc::UnboundedSender<PcmResponse>,
277 ) -> Result<(), Error> {
278     let mut o_desc_chain = desc_receiver.next().await;
279     while let Some(desc_chain) = o_desc_chain {
280         // From the virtio-snd spec:
281         // The device MUST complete all pending I/O messages for the specified stream ID.
282         let status = virtio_snd_pcm_status::new(StatusCode::OK, 0);
283         // Fetch next DescriptorChain to see if the current one is the last one.
284         o_desc_chain = desc_receiver.next().await;
285         let (done, future) = if o_desc_chain.is_none() {
286             let (done, future) = oneshot::channel();
287             (Some(done), Some(future))
288         } else {
289             (None, None)
290         };
291         sender
292             .send(PcmResponse {
293                 desc_chain,
294                 status,
295                 done,
296             })
297             .await
298             .map_err(Error::MpscSend)?;
299 
300         if let Some(f) = future {
301             // From the virtio-snd spec:
302             // The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)
303             // while there are pending I/O messages for the specified stream ID.
304             f.await.map_err(Error::DoneNotTriggered)?;
305         };
306     }
307     Ok(())
308 }
309 
310 /// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx
311 /// queue, and forward them to CRAS. One pcm worker per stream.
312 ///
313 /// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before
314 /// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.
start_pcm_worker( ex: Executor, dstream: DirectionalStream, mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: Rc<AsyncRwLock<WorkerStatus>>, mut sender: mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, card_index: usize, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>315 pub async fn start_pcm_worker(
316     ex: Executor,
317     dstream: DirectionalStream,
318     mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,
319     status_mutex: Rc<AsyncRwLock<WorkerStatus>>,
320     mut sender: mpsc::UnboundedSender<PcmResponse>,
321     period_dur: Duration,
322     card_index: usize,
323     release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
324 ) -> Result<(), Error> {
325     let res = pcm_worker_loop(
326         ex,
327         dstream,
328         &mut desc_receiver,
329         &status_mutex,
330         &mut sender,
331         period_dur,
332         card_index,
333         release_signal,
334     )
335     .await;
336     *status_mutex.lock().await = WorkerStatus::Quit;
337     if res.is_err() {
338         error!(
339             "[Card {}] pcm_worker error: {:#?}. Draining desc_receiver",
340             card_index,
341             res.as_ref().err()
342         );
343         // On error, guaranteed that desc_receiver has not been drained, so drain it here.
344         // Note that drain blocks until the stream is release.
345         drain_desc_receiver(&mut desc_receiver, &mut sender).await?;
346     }
347     res
348 }
349 
pcm_worker_loop( ex: Executor, dstream: DirectionalStream, desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>, status_mutex: &Rc<AsyncRwLock<WorkerStatus>>, sender: &mut mpsc::UnboundedSender<PcmResponse>, period_dur: Duration, card_index: usize, release_signal: Rc<(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>350 async fn pcm_worker_loop(
351     ex: Executor,
352     dstream: DirectionalStream,
353     desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,
354     status_mutex: &Rc<AsyncRwLock<WorkerStatus>>,
355     sender: &mut mpsc::UnboundedSender<PcmResponse>,
356     period_dur: Duration,
357     card_index: usize,
358     release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,
359 ) -> Result<(), Error> {
360     let on_release = async {
361         await_reset_signal(Some(&*release_signal)).await;
362         // After receiving release signal, wait for up to 2 periods,
363         // giving it a chance to respond to the last buffer.
364         if let Err(e) = TimerAsync::sleep(&ex, period_dur * 2).await {
365             error!(
366                 "[Card {}] Error on sleep after receiving reset signal: {}",
367                 card_index, e
368             )
369         }
370     }
371     .fuse();
372     pin_mut!(on_release);
373 
374     match dstream {
375         DirectionalStream::Output(mut sys_direction_output) => loop {
376             #[cfg(windows)]
377             let (mut stream, mut buffer_writer_lock) = (
378                 sys_direction_output
379                     .async_playback_buffer_stream
380                     .lock()
381                     .await,
382                 sys_direction_output.buffer_writer.lock().await,
383             );
384             #[cfg(windows)]
385             let buffer_writer = &mut buffer_writer_lock;
386             #[cfg(any(target_os = "android", target_os = "linux"))]
387             let (stream, buffer_writer) = (
388                 &mut sys_direction_output.async_playback_buffer_stream,
389                 &mut sys_direction_output.buffer_writer,
390             );
391 
392             let next_buf = stream.next_playback_buffer(&ex).fuse();
393             pin_mut!(next_buf);
394 
395             let dst_buf = select! {
396                 _ = on_release => {
397                     drain_desc_receiver(desc_receiver, sender).await?;
398                     break Ok(());
399                 },
400                 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
401             };
402             let worker_status = status_mutex.lock().await;
403             match *worker_status {
404                 WorkerStatus::Quit => {
405                     drain_desc_receiver(desc_receiver, sender).await?;
406                     if let Err(e) = write_data(dst_buf, None, buffer_writer).await {
407                         error!(
408                             "[Card {}] Error on write_data after worker quit: {}",
409                             card_index, e
410                         )
411                     }
412                     break Ok(());
413                 }
414                 WorkerStatus::Pause => {
415                     write_data(dst_buf, None, buffer_writer).await?;
416                 }
417                 WorkerStatus::Running => match desc_receiver.try_next() {
418                     Err(e) => {
419                         error!(
420                             "[Card {}] Underrun. No new DescriptorChain while running: {}",
421                             card_index, e
422                         );
423                         write_data(dst_buf, None, buffer_writer).await?;
424                     }
425                     Ok(None) => {
426                         error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
427                         write_data(dst_buf, None, buffer_writer).await?;
428                         return Err(Error::InvalidPCMWorkerState);
429                     }
430                     Ok(Some(mut desc_chain)) => {
431                         // stream_id was already read in handle_pcm_queue
432                         let status =
433                             write_data(dst_buf, Some(&mut desc_chain.reader), buffer_writer)
434                                 .await
435                                 .into();
436                         sender
437                             .send(PcmResponse {
438                                 desc_chain,
439                                 status,
440                                 done: None,
441                             })
442                             .await
443                             .map_err(Error::MpscSend)?;
444                     }
445                 },
446             }
447         },
448         DirectionalStream::Input(period_bytes, mut buffer_reader) => loop {
449             let next_buf = buffer_reader.get_next_capture_period(&ex).fuse();
450             pin_mut!(next_buf);
451 
452             let src_buf = select! {
453                 _ = on_release => {
454                     drain_desc_receiver(desc_receiver, sender).await?;
455                     break Ok(());
456                 },
457                 buf = next_buf => buf.map_err(Error::FetchBuffer)?,
458             };
459 
460             let worker_status = status_mutex.lock().await;
461             match *worker_status {
462                 WorkerStatus::Quit => {
463                     drain_desc_receiver(desc_receiver, sender).await?;
464                     if let Err(e) = read_data(src_buf, None, period_bytes).await {
465                         error!(
466                             "[Card {}] Error on read_data after worker quit: {}",
467                             card_index, e
468                         )
469                     }
470                     break Ok(());
471                 }
472                 WorkerStatus::Pause => {
473                     read_data(src_buf, None, period_bytes).await?;
474                 }
475                 WorkerStatus::Running => match desc_receiver.try_next() {
476                     Err(e) => {
477                         error!(
478                             "[Card {}] Overrun. No new DescriptorChain while running: {}",
479                             card_index, e
480                         );
481                         read_data(src_buf, None, period_bytes).await?;
482                     }
483                     Ok(None) => {
484                         error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
485                         read_data(src_buf, None, period_bytes).await?;
486                         return Err(Error::InvalidPCMWorkerState);
487                     }
488                     Ok(Some(mut desc_chain)) => {
489                         let status = read_data(src_buf, Some(&mut desc_chain.writer), period_bytes)
490                             .await
491                             .into();
492                         sender
493                             .send(PcmResponse {
494                                 desc_chain,
495                                 status,
496                                 done: None,
497                             })
498                             .await
499                             .map_err(Error::MpscSend)?;
500                     }
501                 },
502             }
503         },
504     }
505 }
506 
507 // Defer pcm message response to the pcm response worker
defer_pcm_response_to_worker( desc_chain: DescriptorChain, status: virtio_snd_pcm_status, response_sender: &mut mpsc::UnboundedSender<PcmResponse>, ) -> Result<(), Error>508 async fn defer_pcm_response_to_worker(
509     desc_chain: DescriptorChain,
510     status: virtio_snd_pcm_status,
511     response_sender: &mut mpsc::UnboundedSender<PcmResponse>,
512 ) -> Result<(), Error> {
513     response_sender
514         .send(PcmResponse {
515             desc_chain,
516             status,
517             done: None,
518         })
519         .await
520         .map_err(Error::MpscSend)
521 }
522 
send_pcm_response( mut desc_chain: DescriptorChain, queue: &mut Queue, status: virtio_snd_pcm_status, ) -> Result<(), Error>523 fn send_pcm_response(
524     mut desc_chain: DescriptorChain,
525     queue: &mut Queue,
526     status: virtio_snd_pcm_status,
527 ) -> Result<(), Error> {
528     let writer = &mut desc_chain.writer;
529 
530     // For rx queue only. Fast forward the unused audio data buffer.
531     if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
532         writer
533             .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
534     }
535     writer.write_obj(status).map_err(Error::WriteResponse)?;
536     let len = writer.bytes_written() as u32;
537     queue.add_used(desc_chain, len);
538     queue.trigger_interrupt();
539     Ok(())
540 }
541 
542 // Await until reset_signal has been released
await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>)543 async fn await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>) {
544     match reset_signal_option {
545         Some((lock, cvar)) => {
546             let mut reset = lock.lock().await;
547             while !*reset {
548                 reset = cvar.wait(reset).await;
549             }
550         }
551         None => futures::future::pending().await,
552     };
553 }
554 
send_pcm_response_worker( queue: Rc<AsyncRwLock<Queue>>, recv: &mut mpsc::UnboundedReceiver<PcmResponse>, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>555 pub async fn send_pcm_response_worker(
556     queue: Rc<AsyncRwLock<Queue>>,
557     recv: &mut mpsc::UnboundedReceiver<PcmResponse>,
558     reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
559 ) -> Result<(), Error> {
560     let on_reset = await_reset_signal(reset_signal).fuse();
561     pin_mut!(on_reset);
562 
563     loop {
564         let next_async = recv.next().fuse();
565         pin_mut!(next_async);
566 
567         let res = select! {
568             _ = on_reset => break,
569             res = next_async => res,
570         };
571 
572         if let Some(r) = res {
573             send_pcm_response(r.desc_chain, &mut *queue.lock().await, r.status)?;
574 
575             // Resume pcm_worker
576             if let Some(done) = r.done {
577                 done.send(()).map_err(Error::OneshotSend)?;
578             }
579         } else {
580             debug!("PcmResponse channel is closed.");
581             break;
582         }
583     }
584     Ok(())
585 }
586 
587 /// Handle messages from the tx or the rx queue. One invocation is needed for
588 /// each queue.
handle_pcm_queue( streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, mut response_sender: mpsc::UnboundedSender<PcmResponse>, queue: Rc<AsyncRwLock<Queue>>, queue_event: &EventAsync, card_index: usize, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>589 pub async fn handle_pcm_queue(
590     streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
591     mut response_sender: mpsc::UnboundedSender<PcmResponse>,
592     queue: Rc<AsyncRwLock<Queue>>,
593     queue_event: &EventAsync,
594     card_index: usize,
595     reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
596 ) -> Result<(), Error> {
597     let on_reset = await_reset_signal(reset_signal).fuse();
598     pin_mut!(on_reset);
599 
600     loop {
601         // Manual queue.next_async() to avoid holding the mutex
602         let next_async = async {
603             loop {
604                 // Check if there are more descriptors available.
605                 if let Some(chain) = queue.lock().await.pop() {
606                     return Ok(chain);
607                 }
608                 queue_event.next_val().await?;
609             }
610         }
611         .fuse();
612         pin_mut!(next_async);
613 
614         let mut desc_chain = select! {
615             _ = on_reset => break,
616             res = next_async => res.map_err(Error::Async)?,
617         };
618 
619         let pcm_xfer: virtio_snd_pcm_xfer =
620             desc_chain.reader.read_obj().map_err(Error::ReadMessage)?;
621         let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;
622 
623         let streams = streams.read_lock().await;
624         let stream_info = match streams.get(stream_id) {
625             Some(stream_info) => stream_info.read_lock().await,
626             None => {
627                 error!(
628                     "[Card {}] stream_id ({}) >= num_streams ({})",
629                     card_index,
630                     stream_id,
631                     streams.len()
632                 );
633                 defer_pcm_response_to_worker(
634                     desc_chain,
635                     virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
636                     &mut response_sender,
637                 )
638                 .await?;
639                 continue;
640             }
641         };
642 
643         match stream_info.sender.as_ref() {
644             Some(mut s) => {
645                 s.send(desc_chain).await.map_err(Error::MpscSend)?;
646                 if *stream_info.status_mutex.lock().await == WorkerStatus::Quit {
647                     // If sender channel is still intact but worker status is quit,
648                     // the worker quitted unexpectedly. Return error to request a reset.
649                     return Err(Error::PCMWorkerQuittedUnexpectedly);
650                 }
651             }
652             None => {
653                 if !stream_info.just_reset {
654                     error!(
655                         "[Card {}] stream {} is not ready. state: {}",
656                         card_index,
657                         stream_id,
658                         get_virtio_snd_r_pcm_cmd_name(stream_info.state)
659                     );
660                 }
661                 defer_pcm_response_to_worker(
662                     desc_chain,
663                     virtio_snd_pcm_status::new(StatusCode::IoErr, 0),
664                     &mut response_sender,
665                 )
666                 .await?;
667             }
668         };
669     }
670     Ok(())
671 }
672 
673 /// Handle all the control messages from the ctrl queue.
handle_ctrl_queue( ex: &Executor, streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>, snd_data: &SndData, queue: Rc<AsyncRwLock<Queue>>, queue_event: &mut EventAsync, tx_send: mpsc::UnboundedSender<PcmResponse>, rx_send: mpsc::UnboundedSender<PcmResponse>, card_index: usize, reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>, ) -> Result<(), Error>674 pub async fn handle_ctrl_queue(
675     ex: &Executor,
676     streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,
677     snd_data: &SndData,
678     queue: Rc<AsyncRwLock<Queue>>,
679     queue_event: &mut EventAsync,
680     tx_send: mpsc::UnboundedSender<PcmResponse>,
681     rx_send: mpsc::UnboundedSender<PcmResponse>,
682     card_index: usize,
683     reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,
684 ) -> Result<(), Error> {
685     let on_reset = await_reset_signal(reset_signal).fuse();
686     pin_mut!(on_reset);
687 
688     let mut queue = queue.lock().await;
689     loop {
690         let mut desc_chain = {
691             let next_async = queue.next_async(queue_event).fuse();
692             pin_mut!(next_async);
693 
694             select! {
695                 _ = on_reset => break,
696                 res = next_async => res.map_err(Error::Async)?,
697             }
698         };
699 
700         let reader = &mut desc_chain.reader;
701         let writer = &mut desc_chain.writer;
702         // Don't advance the reader
703         let code = reader
704             .peek_obj::<virtio_snd_hdr>()
705             .map_err(Error::ReadMessage)?
706             .code
707             .into();
708 
709         let handle_ctrl_msg = async {
710             return match code {
711                 VIRTIO_SND_R_JACK_INFO => {
712                     let query_info: virtio_snd_query_info =
713                         reader.read_obj().map_err(Error::ReadMessage)?;
714                     let start_id: usize = u32::from(query_info.start_id) as usize;
715                     let count: usize = u32::from(query_info.count) as usize;
716                     if start_id + count > snd_data.jack_info.len() {
717                         error!(
718                             "[Card {}] start_id({}) + count({}) must be smaller than \
719                             the number of jacks ({})",
720                             card_index,
721                             start_id,
722                             count,
723                             snd_data.jack_info.len()
724                         );
725                         return writer
726                             .write_obj(VIRTIO_SND_S_BAD_MSG)
727                             .map_err(Error::WriteResponse);
728                     }
729                     // The response consists of the virtio_snd_hdr structure (contains the request
730                     // status code), followed by the device-writable information structures of the
731                     // item. Each information structure begins with the following common header
732                     writer
733                         .write_obj(VIRTIO_SND_S_OK)
734                         .map_err(Error::WriteResponse)?;
735                     for i in start_id..(start_id + count) {
736                         writer
737                             .write_all(snd_data.jack_info[i].as_bytes())
738                             .map_err(Error::WriteResponse)?;
739                     }
740                     Ok(())
741                 }
742                 VIRTIO_SND_R_PCM_INFO => {
743                     let query_info: virtio_snd_query_info =
744                         reader.read_obj().map_err(Error::ReadMessage)?;
745                     let start_id: usize = u32::from(query_info.start_id) as usize;
746                     let count: usize = u32::from(query_info.count) as usize;
747                     if start_id + count > snd_data.pcm_info.len() {
748                         error!(
749                             "[Card {}] start_id({}) + count({}) must be smaller than \
750                             the number of streams ({})",
751                             card_index,
752                             start_id,
753                             count,
754                             snd_data.pcm_info.len()
755                         );
756                         return writer
757                             .write_obj(VIRTIO_SND_S_BAD_MSG)
758                             .map_err(Error::WriteResponse);
759                     }
760                     // The response consists of the virtio_snd_hdr structure (contains the request
761                     // status code), followed by the device-writable information structures of the
762                     // item. Each information structure begins with the following common header
763                     writer
764                         .write_obj(VIRTIO_SND_S_OK)
765                         .map_err(Error::WriteResponse)?;
766                     for i in start_id..(start_id + count) {
767                         writer
768                             .write_all(snd_data.pcm_info[i].as_bytes())
769                             .map_err(Error::WriteResponse)?;
770                     }
771                     Ok(())
772                 }
773                 VIRTIO_SND_R_CHMAP_INFO => {
774                     let query_info: virtio_snd_query_info =
775                         reader.read_obj().map_err(Error::ReadMessage)?;
776                     let start_id: usize = u32::from(query_info.start_id) as usize;
777                     let count: usize = u32::from(query_info.count) as usize;
778                     if start_id + count > snd_data.chmap_info.len() {
779                         error!(
780                             "[Card {}] start_id({}) + count({}) must be smaller than \
781                             the number of chmaps ({})",
782                             card_index,
783                             start_id,
784                             count,
785                             snd_data.chmap_info.len()
786                         );
787                         return writer
788                             .write_obj(VIRTIO_SND_S_BAD_MSG)
789                             .map_err(Error::WriteResponse);
790                     }
791                     // The response consists of the virtio_snd_hdr structure (contains the request
792                     // status code), followed by the device-writable information structures of the
793                     // item. Each information structure begins with the following common header
794                     writer
795                         .write_obj(VIRTIO_SND_S_OK)
796                         .map_err(Error::WriteResponse)?;
797                     for i in start_id..(start_id + count) {
798                         writer
799                             .write_all(snd_data.chmap_info[i].as_bytes())
800                             .map_err(Error::WriteResponse)?;
801                     }
802                     Ok(())
803                 }
804                 VIRTIO_SND_R_JACK_REMAP => {
805                     unreachable!("remap is unsupported");
806                 }
807                 VIRTIO_SND_R_PCM_SET_PARAMS => {
808                     // Raise VIRTIO_SND_S_BAD_MSG or IO error?
809                     let set_params: virtio_snd_pcm_set_params =
810                         reader.read_obj().map_err(Error::ReadMessage)?;
811                     let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;
812                     let buffer_bytes: u32 = set_params.buffer_bytes.into();
813                     let period_bytes: u32 = set_params.period_bytes.into();
814 
815                     let dir = match snd_data.pcm_info.get(stream_id) {
816                         Some(pcm_info) => {
817                             if set_params.channels < pcm_info.channels_min
818                                 || set_params.channels > pcm_info.channels_max
819                             {
820                                 error!(
821                                     "[Card {}] Number of channels ({}) must be between {} and {}",
822                                     card_index,
823                                     set_params.channels,
824                                     pcm_info.channels_min,
825                                     pcm_info.channels_max
826                                 );
827                                 return writer
828                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
829                                     .map_err(Error::WriteResponse);
830                             }
831                             if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {
832                                 error!(
833                                     "[Card {}] PCM format {} is not supported.",
834                                     card_index, set_params.format
835                                 );
836                                 return writer
837                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
838                                     .map_err(Error::WriteResponse);
839                             }
840                             if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {
841                                 error!(
842                                     "[Card {}] PCM frame rate {} is not supported.",
843                                     card_index, set_params.rate
844                                 );
845                                 return writer
846                                     .write_obj(VIRTIO_SND_S_NOT_SUPP)
847                                     .map_err(Error::WriteResponse);
848                             }
849 
850                             pcm_info.direction
851                         }
852                         None => {
853                             error!(
854                                 "[Card {}] stream_id {} < streams {}",
855                                 card_index,
856                                 stream_id,
857                                 snd_data.pcm_info.len()
858                             );
859                             return writer
860                                 .write_obj(VIRTIO_SND_S_BAD_MSG)
861                                 .map_err(Error::WriteResponse);
862                         }
863                     };
864 
865                     if set_params.features != 0 {
866                         error!("[Card {}] No feature is supported", card_index);
867                         return writer
868                             .write_obj(VIRTIO_SND_S_NOT_SUPP)
869                             .map_err(Error::WriteResponse);
870                     }
871 
872                     if buffer_bytes % period_bytes != 0 {
873                         error!(
874                             "[Card {}] buffer_bytes({}) must be dividable by period_bytes({})",
875                             card_index, buffer_bytes, period_bytes
876                         );
877                         return writer
878                             .write_obj(VIRTIO_SND_S_BAD_MSG)
879                             .map_err(Error::WriteResponse);
880                     }
881 
882                     process_pcm_ctrl(
883                         ex,
884                         &tx_send,
885                         &rx_send,
886                         streams,
887                         VirtioSndPcmCmd::with_set_params_and_direction(set_params, dir),
888                         writer,
889                         stream_id,
890                         card_index,
891                     )
892                     .await
893                 }
894                 VIRTIO_SND_R_PCM_PREPARE
895                 | VIRTIO_SND_R_PCM_START
896                 | VIRTIO_SND_R_PCM_STOP
897                 | VIRTIO_SND_R_PCM_RELEASE => {
898                     let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;
899                     let stream_id: usize = u32::from(hdr.stream_id) as usize;
900                     let cmd = match VirtioSndPcmCmd::try_from(code) {
901                         Ok(cmd) => cmd,
902                         Err(err) => {
903                             error!(
904                                 "[Card {}] Error converting code to command: {}",
905                                 card_index, err
906                             );
907                             return writer
908                                 .write_obj(VIRTIO_SND_S_BAD_MSG)
909                                 .map_err(Error::WriteResponse);
910                         }
911                     };
912                     process_pcm_ctrl(
913                         ex, &tx_send, &rx_send, streams, cmd, writer, stream_id, card_index,
914                     )
915                     .await
916                     .and(Ok(()))?;
917                     Ok(())
918                 }
919                 c => {
920                     error!("[Card {}] Unrecognized code: {}", card_index, c);
921                     return writer
922                         .write_obj(VIRTIO_SND_S_BAD_MSG)
923                         .map_err(Error::WriteResponse);
924                 }
925             };
926         };
927 
928         handle_ctrl_msg.await?;
929         let len = writer.bytes_written() as u32;
930         queue.add_used(desc_chain, len);
931         queue.trigger_interrupt();
932     }
933     Ok(())
934 }
935 
936 /// Send events to the audio driver.
handle_event_queue( mut queue: Queue, mut queue_event: EventAsync, ) -> Result<(), Error>937 pub async fn handle_event_queue(
938     mut queue: Queue,
939     mut queue_event: EventAsync,
940 ) -> Result<(), Error> {
941     loop {
942         let desc_chain = queue
943             .next_async(&mut queue_event)
944             .await
945             .map_err(Error::Async)?;
946 
947         // TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)
948         queue.add_used(desc_chain, 0);
949         queue.trigger_interrupt();
950     }
951 }
952