xref: /aosp_15_r20/external/crosvm/common/audio_streams/src/audio_streams.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2019 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Provides an interface for playing and recording audio.
6 //!
7 //! When implementing an audio playback system, the `StreamSource` trait is implemented.
8 //! Implementors of this trait allow creation of `PlaybackBufferStream` objects. The
9 //! `PlaybackBufferStream` provides the actual audio buffers to be filled with audio samples. These
10 //! buffers can be filled with `write_playback_buffer`.
11 //!
12 //! Users playing audio fill the provided buffers with audio. When a `PlaybackBuffer` is dropped,
13 //! the samples written to it are committed to the `PlaybackBufferStream` it came from.
14 //!
15 //! ```
16 //! use audio_streams::{BoxError, PlaybackBuffer, SampleFormat, StreamSource, NoopStreamSource};
17 //! use std::io::Write;
18 //!
19 //! const buffer_size: usize = 120;
20 //! const num_channels: usize = 2;
21 //!
22 //! # fn main() -> std::result::Result<(), BoxError> {
23 //! let mut stream_source = NoopStreamSource::new();
24 //! let sample_format = SampleFormat::S16LE;
25 //! let frame_size = num_channels * sample_format.sample_bytes();
26 //!
27 //! let (_, mut stream) = stream_source
28 //!     .new_playback_stream(num_channels, sample_format, 48000, buffer_size)?;
29 //! // Play 10 buffers of DC.
30 //! let mut buf = Vec::new();
31 //! buf.resize(buffer_size * frame_size, 0xa5u8);
32 //! for _ in 0..10 {
33 //!     let mut copy_cb = |stream_buffer: &mut PlaybackBuffer| {
34 //!         assert_eq!(stream_buffer.write(&buf)?, buffer_size * frame_size);
35 //!         Ok(())
36 //!     };
37 //!     stream.write_playback_buffer(&mut copy_cb)?;
38 //! }
39 //! # Ok (())
40 //! # }
41 //! ```
42 pub mod async_api;
43 
44 use std::cmp::min;
45 use std::error;
46 use std::fmt;
47 use std::fmt::Display;
48 use std::io;
49 use std::io::Read;
50 use std::io::Write;
51 #[cfg(unix)]
52 use std::os::unix::io::RawFd as RawDescriptor;
53 #[cfg(windows)]
54 use std::os::windows::io::RawHandle as RawDescriptor;
55 use std::result::Result;
56 use std::str::FromStr;
57 use std::time::Duration;
58 use std::time::Instant;
59 
60 pub use async_api::AsyncStream;
61 pub use async_api::AudioStreamsExecutor;
62 use async_trait::async_trait;
63 use remain::sorted;
64 use serde::Deserialize;
65 use serde::Serialize;
66 use thiserror::Error;
67 
68 #[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
69 pub enum SampleFormat {
70     U8,
71     S16LE,
72     S24LE,
73     S32LE,
74 }
75 
76 impl SampleFormat {
sample_bytes(self) -> usize77     pub fn sample_bytes(self) -> usize {
78         use SampleFormat::*;
79         match self {
80             U8 => 1,
81             S16LE => 2,
82             S24LE => 4, // Not a typo, S24_LE samples are stored in 4 byte chunks.
83             S32LE => 4,
84         }
85     }
86 }
87 
88 impl Display for SampleFormat {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result89     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
90         use SampleFormat::*;
91         match self {
92             U8 => write!(f, "Unsigned 8 bit"),
93             S16LE => write!(f, "Signed 16 bit Little Endian"),
94             S24LE => write!(f, "Signed 24 bit Little Endian"),
95             S32LE => write!(f, "Signed 32 bit Little Endian"),
96         }
97     }
98 }
99 
100 impl FromStr for SampleFormat {
101     type Err = SampleFormatError;
from_str(s: &str) -> std::result::Result<Self, Self::Err>102     fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
103         match s {
104             "U8" => Ok(SampleFormat::U8),
105             "S16_LE" => Ok(SampleFormat::S16LE),
106             "S24_LE" => Ok(SampleFormat::S24LE),
107             "S32_LE" => Ok(SampleFormat::S32LE),
108             _ => Err(SampleFormatError::InvalidSampleFormat),
109         }
110     }
111 }
112 
113 /// Errors that are possible from a `SampleFormat`.
114 #[sorted]
115 #[derive(Error, Debug)]
116 pub enum SampleFormatError {
117     #[error("Must be in [U8, S16_LE, S24_LE, S32_LE]")]
118     InvalidSampleFormat,
119 }
120 
121 /// Valid directions of an audio stream.
122 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
123 pub enum StreamDirection {
124     Playback,
125     Capture,
126 }
127 
128 /// Valid effects for an audio stream.
129 #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
130 pub enum StreamEffect {
131     #[default]
132     NoEffect,
133     #[serde(alias = "aec")]
134     EchoCancellation,
135 }
136 
137 pub mod capture;
138 pub mod shm_streams;
139 
140 /// Errors that can pass across threads.
141 pub type BoxError = Box<dyn error::Error + Send + Sync>;
142 
143 /// Errors that are possible from a `StreamEffect`.
144 #[sorted]
145 #[derive(Error, Debug)]
146 pub enum StreamEffectError {
147     #[error("Must be in [EchoCancellation, aec]")]
148     InvalidEffect,
149 }
150 
151 impl FromStr for StreamEffect {
152     type Err = StreamEffectError;
from_str(s: &str) -> std::result::Result<Self, Self::Err>153     fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
154         match s {
155             "EchoCancellation" | "aec" => Ok(StreamEffect::EchoCancellation),
156             _ => Err(StreamEffectError::InvalidEffect),
157         }
158     }
159 }
160 
161 #[sorted]
162 #[derive(Error, Debug)]
163 pub enum Error {
164     #[error("Unimplemented")]
165     Unimplemented,
166 }
167 
168 /// `StreamSourceGenerator` is a trait used to abstract types that create [`StreamSource`].
169 /// It can be used when multiple types of `StreamSource` are needed.
170 pub trait StreamSourceGenerator: Sync + Send {
generate(&self) -> Result<Box<dyn StreamSource>, BoxError>171     fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError>;
172 }
173 
174 /// `StreamSource` creates streams for playback or capture of audio.
175 #[async_trait(?Send)]
176 pub trait StreamSource: Send {
177     /// Returns a stream control and buffer generator object. These are separate as the buffer
178     /// generator might want to be passed to the audio stream.
179     #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>180     fn new_playback_stream(
181         &mut self,
182         num_channels: usize,
183         format: SampleFormat,
184         frame_rate: u32,
185         buffer_size: usize,
186     ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>;
187 
188     /// Returns a stream control and async buffer generator object. These are separate as the buffer
189     /// generator might want to be passed to the audio stream.
190     #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, _num_channels: usize, _format: SampleFormat, _frame_rate: u32, _buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>191     fn new_async_playback_stream(
192         &mut self,
193         _num_channels: usize,
194         _format: SampleFormat,
195         _frame_rate: u32,
196         _buffer_size: usize,
197         _ex: &dyn AudioStreamsExecutor,
198     ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
199         Err(Box::new(Error::Unimplemented))
200     }
201 
202     /// Returns a stream control and async buffer generator object asynchronously.
203     /// Default implementation calls and blocks on `new_async_playback_stream()`.
204     #[allow(clippy::type_complexity)]
async_new_async_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>205     async fn async_new_async_playback_stream(
206         &mut self,
207         num_channels: usize,
208         format: SampleFormat,
209         frame_rate: u32,
210         buffer_size: usize,
211         ex: &dyn AudioStreamsExecutor,
212     ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
213         self.new_async_playback_stream(num_channels, format, frame_rate, buffer_size, ex)
214     }
215 
216     /// Returns a stream control and buffer generator object. These are separate as the buffer
217     /// generator might want to be passed to the audio stream.
218     /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
219     #[allow(clippy::type_complexity)]
new_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::CaptureBufferStream>, ), BoxError, >220     fn new_capture_stream(
221         &mut self,
222         num_channels: usize,
223         format: SampleFormat,
224         frame_rate: u32,
225         buffer_size: usize,
226         _effects: &[StreamEffect],
227     ) -> Result<
228         (
229             Box<dyn StreamControl>,
230             Box<dyn capture::CaptureBufferStream>,
231         ),
232         BoxError,
233     > {
234         Ok((
235             Box::new(NoopStreamControl::new()),
236             Box::new(capture::NoopCaptureStream::new(
237                 num_channels,
238                 format,
239                 frame_rate,
240                 buffer_size,
241             )),
242         ))
243     }
244 
245     /// Returns a stream control and async buffer generator object. These are separate as the buffer
246     /// generator might want to be passed to the audio stream.
247     /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
248     #[allow(clippy::type_complexity)]
new_async_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _ex: &dyn AudioStreamsExecutor, ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::AsyncCaptureBufferStream>, ), BoxError, >249     fn new_async_capture_stream(
250         &mut self,
251         num_channels: usize,
252         format: SampleFormat,
253         frame_rate: u32,
254         buffer_size: usize,
255         _effects: &[StreamEffect],
256         _ex: &dyn AudioStreamsExecutor,
257     ) -> Result<
258         (
259             Box<dyn StreamControl>,
260             Box<dyn capture::AsyncCaptureBufferStream>,
261         ),
262         BoxError,
263     > {
264         Ok((
265             Box::new(NoopStreamControl::new()),
266             Box::new(capture::NoopCaptureStream::new(
267                 num_channels,
268                 format,
269                 frame_rate,
270                 buffer_size,
271             )),
272         ))
273     }
274 
275     /// Returns a stream control and async buffer generator object asynchronously.
276     /// Default implementation calls and blocks on `new_async_capture_stream()`.
277     #[allow(clippy::type_complexity)]
async_new_async_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, effects: &[StreamEffect], ex: &dyn AudioStreamsExecutor, ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::AsyncCaptureBufferStream>, ), BoxError, >278     async fn async_new_async_capture_stream(
279         &mut self,
280         num_channels: usize,
281         format: SampleFormat,
282         frame_rate: u32,
283         buffer_size: usize,
284         effects: &[StreamEffect],
285         ex: &dyn AudioStreamsExecutor,
286     ) -> Result<
287         (
288             Box<dyn StreamControl>,
289             Box<dyn capture::AsyncCaptureBufferStream>,
290         ),
291         BoxError,
292     > {
293         self.new_async_capture_stream(num_channels, format, frame_rate, buffer_size, effects, ex)
294     }
295 
296     /// Returns any open file descriptors needed by the implementor. The FD list helps users of the
297     /// StreamSource enter Linux jails making sure not to close needed FDs.
keep_rds(&self) -> Option<Vec<RawDescriptor>>298     fn keep_rds(&self) -> Option<Vec<RawDescriptor>> {
299         None
300     }
301 }
302 
303 /// `PlaybackBufferStream` provides `PlaybackBuffer`s to fill with audio samples for playback.
304 pub trait PlaybackBufferStream: Send {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>305     fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>;
306 
307     /// Call `f` with a `PlaybackBuffer`, and trigger the buffer done call back after. `f` should
308     /// write playback data to the given `PlaybackBuffer`.
write_playback_buffer<'b, 's: 'b>( &'s mut self, f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>, ) -> Result<(), BoxError>309     fn write_playback_buffer<'b, 's: 'b>(
310         &'s mut self,
311         f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>,
312     ) -> Result<(), BoxError> {
313         let mut buf = self.next_playback_buffer()?;
314         f(&mut buf)?;
315         buf.commit();
316         Ok(())
317     }
318 }
319 
320 impl<S: PlaybackBufferStream + ?Sized> PlaybackBufferStream for &mut S {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>321     fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
322         (**self).next_playback_buffer()
323     }
324 }
325 
326 /// `PlaybackBufferStream` provides `PlaybackBuffer`s asynchronously to fill with audio samples for
327 /// playback.
328 #[async_trait(?Send)]
329 pub trait AsyncPlaybackBufferStream: Send {
next_playback_buffer<'a>( &'a mut self, _ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>330     async fn next_playback_buffer<'a>(
331         &'a mut self,
332         _ex: &dyn AudioStreamsExecutor,
333     ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>;
334 }
335 
336 #[async_trait(?Send)]
337 impl<S: AsyncPlaybackBufferStream + ?Sized> AsyncPlaybackBufferStream for &mut S {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>338     async fn next_playback_buffer<'a>(
339         &'a mut self,
340         ex: &dyn AudioStreamsExecutor,
341     ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
342         (**self).next_playback_buffer(ex).await
343     }
344 }
345 
346 /// Call `f` with a `AsyncPlaybackBuffer`, and trigger the buffer done call back after. `f` should
347 /// write playback data to the given `AsyncPlaybackBuffer`.
348 ///
349 /// This cannot be a trait method because trait methods with generic parameters are not object safe.
async_write_playback_buffer<F>( stream: &mut dyn AsyncPlaybackBufferStream, f: F, ex: &dyn AudioStreamsExecutor, ) -> Result<(), BoxError> where F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,350 pub async fn async_write_playback_buffer<F>(
351     stream: &mut dyn AsyncPlaybackBufferStream,
352     f: F,
353     ex: &dyn AudioStreamsExecutor,
354 ) -> Result<(), BoxError>
355 where
356     F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,
357 {
358     let mut buf = stream.next_playback_buffer(ex).await?;
359     f(&mut buf)?;
360     buf.commit().await;
361     Ok(())
362 }
363 
364 /// `StreamControl` provides a way to set the volume and mute states of a stream. `StreamControl`
365 /// is separate from the stream so it can be owned by a different thread if needed.
366 pub trait StreamControl: Send + Sync {
set_volume(&mut self, _scaler: f64)367     fn set_volume(&mut self, _scaler: f64) {}
set_mute(&mut self, _mute: bool)368     fn set_mute(&mut self, _mute: bool) {}
369 }
370 
371 /// `BufferCommit` is a cleanup funcion that must be called before dropping the buffer,
372 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
373 pub trait BufferCommit {
374     /// `write_playback_buffer` or `read_capture_buffer` would trigger this automatically. `nframes`
375     /// indicates the number of audio frames that were read or written to the device.
commit(&mut self, nframes: usize)376     fn commit(&mut self, nframes: usize);
377     /// `latency_bytes` the current device latency.
378     /// For playback it means how many bytes need to be consumed
379     /// before the current playback buffer will be played.
380     /// For capture it means the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32381     fn latency_bytes(&self) -> u32 {
382         0
383     }
384 }
385 
386 /// `AsyncBufferCommit` is a cleanup funcion that must be called before dropping the buffer,
387 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
388 #[async_trait(?Send)]
389 pub trait AsyncBufferCommit {
390     /// `async_write_playback_buffer` or `async_read_capture_buffer` would trigger this
391     /// automatically. `nframes` indicates the number of audio frames that were read or written to
392     /// the device.
commit(&mut self, nframes: usize)393     async fn commit(&mut self, nframes: usize);
394     /// `latency_bytes` the current device latency.
395     /// For playback it means how many bytes need to be consumed
396     /// before the current playback buffer will be played.
397     /// For capture it means the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32398     fn latency_bytes(&self) -> u32 {
399         0
400     }
401 }
402 
403 /// Errors that are possible from a `PlaybackBuffer`.
404 #[sorted]
405 #[derive(Error, Debug)]
406 pub enum PlaybackBufferError {
407     #[error("Invalid buffer length")]
408     InvalidLength,
409     #[error("Slicing of playback buffer out of bounds")]
410     SliceOutOfBounds,
411 }
412 
413 /// `AudioBuffer` is one buffer that holds buffer_size audio frames.
414 /// It is the inner data of `PlaybackBuffer` and `CaptureBuffer`.
415 struct AudioBuffer<'a> {
416     buffer: &'a mut [u8],
417     offset: usize,     // Read or Write offset in frames.
418     frame_size: usize, // Size of a frame in bytes.
419 }
420 
421 impl<'a> AudioBuffer<'a> {
422     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize423     pub fn frame_capacity(&self) -> usize {
424         self.buffer.len() / self.frame_size
425     }
426 
calc_len(&self, size: usize) -> usize427     fn calc_len(&self, size: usize) -> usize {
428         min(
429             size / self.frame_size * self.frame_size,
430             self.buffer.len() - self.offset,
431         )
432     }
433 
434     /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>435     pub fn write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
436         // only write complete frames.
437         let len = self.calc_len(size);
438         cb(&mut self.buffer[self.offset..(self.offset + len)]);
439         self.offset += len;
440         Ok(len)
441     }
442 
443     /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>444     pub fn read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
445         let len = self.calc_len(size);
446         cb(&self.buffer[self.offset..(self.offset + len)]);
447         self.offset += len;
448         Ok(len)
449     }
450 
451     /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>452     pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
453         let bytes = reader.read(&mut self.buffer[self.offset..])?;
454         self.offset += bytes;
455         Ok(bytes)
456     }
457 
458     /// Copy data to an io::Write
copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize>459     pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
460         let bytes = writer.write(&self.buffer[self.offset..])?;
461         self.offset += bytes;
462         Ok(bytes)
463     }
464 }
465 
466 impl<'a> Write for AudioBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>467     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
468         let written = (&mut self.buffer[self.offset..]).write(&buf[..buf.len()])?;
469         self.offset += written;
470         Ok(written)
471     }
472 
flush(&mut self) -> io::Result<()>473     fn flush(&mut self) -> io::Result<()> {
474         Ok(())
475     }
476 }
477 
478 impl<'a> Read for AudioBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>479     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
480         let len = buf.len() / self.frame_size * self.frame_size;
481         let written = (&mut buf[..len]).write(&self.buffer[self.offset..])?;
482         self.offset += written;
483         Ok(written)
484     }
485 }
486 
487 /// `PlaybackBuffer` is one buffer that holds buffer_size audio frames. It is used to temporarily
488 /// allow access to an audio buffer and notifes the owning stream of write completion when dropped.
489 pub struct PlaybackBuffer<'a> {
490     buffer: AudioBuffer<'a>,
491     drop: &'a mut dyn BufferCommit,
492 }
493 
494 impl<'a> PlaybackBuffer<'a> {
495     /// Creates a new `PlaybackBuffer` that holds a reference to the backing memory specified in
496     /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], drop: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: BufferCommit,497     pub fn new<F>(
498         frame_size: usize,
499         buffer: &'a mut [u8],
500         drop: &'a mut F,
501     ) -> Result<Self, PlaybackBufferError>
502     where
503         F: BufferCommit,
504     {
505         if buffer.len() % frame_size != 0 {
506             return Err(PlaybackBufferError::InvalidLength);
507         }
508 
509         Ok(PlaybackBuffer {
510             buffer: AudioBuffer {
511                 buffer,
512                 offset: 0,
513                 frame_size,
514             },
515             drop,
516         })
517     }
518 
519     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize520     pub fn frame_capacity(&self) -> usize {
521         self.buffer.frame_capacity()
522     }
523 
524     /// This triggers the commit of `BufferCommit`. This should be called after the data is copied
525     /// to the buffer.
commit(&mut self)526     pub fn commit(&mut self) {
527         self.drop
528             .commit(self.buffer.offset / self.buffer.frame_size);
529     }
530 
531     /// It returns how many bytes need to be consumed
532     /// before the current playback buffer will be played.
latency_bytes(&self) -> u32533     pub fn latency_bytes(&self) -> u32 {
534         self.drop.latency_bytes()
535     }
536 
537     /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>538     pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
539         self.buffer.write_copy_cb(size, cb)
540     }
541 }
542 
543 impl<'a> Write for PlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>544     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
545         self.buffer.write(buf)
546     }
547 
flush(&mut self) -> io::Result<()>548     fn flush(&mut self) -> io::Result<()> {
549         self.buffer.flush()
550     }
551 }
552 
553 /// `AsyncPlaybackBuffer` is the async version of `PlaybackBuffer`.
554 pub struct AsyncPlaybackBuffer<'a> {
555     buffer: AudioBuffer<'a>,
556     trigger: &'a mut dyn AsyncBufferCommit,
557 }
558 
559 impl<'a> AsyncPlaybackBuffer<'a> {
560     /// Creates a new `AsyncPlaybackBuffer` that holds a reference to the backing memory specified
561     /// in `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], trigger: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: AsyncBufferCommit,562     pub fn new<F>(
563         frame_size: usize,
564         buffer: &'a mut [u8],
565         trigger: &'a mut F,
566     ) -> Result<Self, PlaybackBufferError>
567     where
568         F: AsyncBufferCommit,
569     {
570         if buffer.len() % frame_size != 0 {
571             return Err(PlaybackBufferError::InvalidLength);
572         }
573 
574         Ok(AsyncPlaybackBuffer {
575             buffer: AudioBuffer {
576                 buffer,
577                 offset: 0,
578                 frame_size,
579             },
580             trigger,
581         })
582     }
583 
584     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize585     pub fn frame_capacity(&self) -> usize {
586         self.buffer.frame_capacity()
587     }
588 
589     /// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
590     /// copied to the buffer.
commit(&mut self)591     pub async fn commit(&mut self) {
592         self.trigger
593             .commit(self.buffer.offset / self.buffer.frame_size)
594             .await;
595     }
596 
597     /// It returns the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32598     pub fn latency_bytes(&self) -> u32 {
599         self.trigger.latency_bytes()
600     }
601 
602     /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>603     pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
604         self.buffer.write_copy_cb(size, cb)
605     }
606 
607     /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>608     pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
609         self.buffer.copy_from(reader)
610     }
611 }
612 
613 impl<'a> Write for AsyncPlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>614     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
615         self.buffer.write(buf)
616     }
617 
flush(&mut self) -> io::Result<()>618     fn flush(&mut self) -> io::Result<()> {
619         self.buffer.flush()
620     }
621 }
622 /// Stream that accepts playback samples but drops them.
623 pub struct NoopStream {
624     buffer: Vec<u8>,
625     frame_size: usize,
626     interval: Duration,
627     next_frame: Duration,
628     start_time: Option<Instant>,
629     buffer_drop: NoopBufferCommit,
630 }
631 
632 /// NoopStream data that is needed from the buffer complete callback.
633 struct NoopBufferCommit {
634     which_buffer: bool,
635 }
636 
637 impl BufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)638     fn commit(&mut self, _nwritten: usize) {
639         // When a buffer completes, switch to the other one.
640         self.which_buffer ^= true;
641     }
642 }
643 
644 #[async_trait(?Send)]
645 impl AsyncBufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)646     async fn commit(&mut self, _nwritten: usize) {
647         // When a buffer completes, switch to the other one.
648         self.which_buffer ^= true;
649     }
650 }
651 
652 impl NoopStream {
new( num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Self653     pub fn new(
654         num_channels: usize,
655         format: SampleFormat,
656         frame_rate: u32,
657         buffer_size: usize,
658     ) -> Self {
659         let frame_size = format.sample_bytes() * num_channels;
660         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
661         NoopStream {
662             buffer: vec![0; buffer_size * frame_size],
663             frame_size,
664             interval,
665             next_frame: interval,
666             start_time: None,
667             buffer_drop: NoopBufferCommit {
668                 which_buffer: false,
669             },
670         }
671     }
672 }
673 
674 impl PlaybackBufferStream for NoopStream {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>675     fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
676         if let Some(start_time) = self.start_time {
677             let elapsed = start_time.elapsed();
678             if elapsed < self.next_frame {
679                 std::thread::sleep(self.next_frame - elapsed);
680             }
681             self.next_frame += self.interval;
682         } else {
683             self.start_time = Some(Instant::now());
684             self.next_frame = self.interval;
685         }
686         Ok(PlaybackBuffer::new(
687             self.frame_size,
688             &mut self.buffer,
689             &mut self.buffer_drop,
690         )?)
691     }
692 }
693 
694 #[async_trait(?Send)]
695 impl AsyncPlaybackBufferStream for NoopStream {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>696     async fn next_playback_buffer<'a>(
697         &'a mut self,
698         ex: &dyn AudioStreamsExecutor,
699     ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
700         if let Some(start_time) = self.start_time {
701             let elapsed = start_time.elapsed();
702             if elapsed < self.next_frame {
703                 ex.delay(self.next_frame - elapsed).await?;
704             }
705             self.next_frame += self.interval;
706         } else {
707             self.start_time = Some(Instant::now());
708             self.next_frame = self.interval;
709         }
710         Ok(AsyncPlaybackBuffer::new(
711             self.frame_size,
712             &mut self.buffer,
713             &mut self.buffer_drop,
714         )?)
715     }
716 }
717 
718 /// No-op control for `NoopStream`s.
719 #[derive(Default)]
720 pub struct NoopStreamControl;
721 
722 impl NoopStreamControl {
new() -> Self723     pub fn new() -> Self {
724         NoopStreamControl {}
725     }
726 }
727 
728 impl StreamControl for NoopStreamControl {}
729 
730 /// Source of `NoopStream` and `NoopStreamControl` objects.
731 #[derive(Default)]
732 pub struct NoopStreamSource;
733 
734 impl NoopStreamSource {
new() -> Self735     pub fn new() -> Self {
736         NoopStreamSource {}
737     }
738 }
739 
740 impl StreamSource for NoopStreamSource {
741     #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>742     fn new_playback_stream(
743         &mut self,
744         num_channels: usize,
745         format: SampleFormat,
746         frame_rate: u32,
747         buffer_size: usize,
748     ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError> {
749         Ok((
750             Box::new(NoopStreamControl::new()),
751             Box::new(NoopStream::new(
752                 num_channels,
753                 format,
754                 frame_rate,
755                 buffer_size,
756             )),
757         ))
758     }
759 
760     #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>761     fn new_async_playback_stream(
762         &mut self,
763         num_channels: usize,
764         format: SampleFormat,
765         frame_rate: u32,
766         buffer_size: usize,
767         _ex: &dyn AudioStreamsExecutor,
768     ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
769         Ok((
770             Box::new(NoopStreamControl::new()),
771             Box::new(NoopStream::new(
772                 num_channels,
773                 format,
774                 frame_rate,
775                 buffer_size,
776             )),
777         ))
778     }
779 }
780 
781 /// `NoopStreamSourceGenerator` is a struct that implements [`StreamSourceGenerator`]
782 /// to generate [`NoopStreamSource`].
783 pub struct NoopStreamSourceGenerator;
784 
785 impl NoopStreamSourceGenerator {
new() -> Self786     pub fn new() -> Self {
787         NoopStreamSourceGenerator {}
788     }
789 }
790 
791 impl Default for NoopStreamSourceGenerator {
default() -> Self792     fn default() -> Self {
793         Self::new()
794     }
795 }
796 
797 impl StreamSourceGenerator for NoopStreamSourceGenerator {
generate(&self) -> Result<Box<dyn StreamSource>, BoxError>798     fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError> {
799         Ok(Box::new(NoopStreamSource))
800     }
801 }
802 
803 #[cfg(test)]
804 mod tests {
805     use futures::FutureExt;
806     use io::Write;
807 
808     use super::async_api::test::TestExecutor;
809     use super::*;
810 
811     #[test]
invalid_buffer_length()812     fn invalid_buffer_length() {
813         // Playback buffers can't be created with a size that isn't divisible by the frame size.
814         let mut pb_buf = [0xa5u8; 480 * 2 * 2 + 1];
815         let mut buffer_drop = NoopBufferCommit {
816             which_buffer: false,
817         };
818         assert!(PlaybackBuffer::new(2, &mut pb_buf, &mut buffer_drop).is_err());
819     }
820 
821     #[test]
audio_buffer_copy_from()822     fn audio_buffer_copy_from() {
823         const PERIOD_SIZE: usize = 8192;
824         const NUM_CHANNELS: usize = 6;
825         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
826         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
827         let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
828         let mut aud_buf = AudioBuffer {
829             buffer: &mut dst_buf,
830             offset: 0,
831             frame_size: FRAME_SIZE,
832         };
833         aud_buf
834             .copy_from(&mut &src_buf[..])
835             .expect("all data should be copied.");
836         assert_eq!(dst_buf, src_buf);
837     }
838 
839     #[test]
audio_buffer_copy_from_repeat()840     fn audio_buffer_copy_from_repeat() {
841         const PERIOD_SIZE: usize = 8192;
842         const NUM_CHANNELS: usize = 6;
843         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
844         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
845         let mut aud_buf = AudioBuffer {
846             buffer: &mut dst_buf,
847             offset: 0,
848             frame_size: FRAME_SIZE,
849         };
850         let bytes = aud_buf
851             .copy_from(&mut io::repeat(1))
852             .expect("all data should be copied.");
853         assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
854         assert_eq!(dst_buf, [1u8; PERIOD_SIZE * FRAME_SIZE]);
855     }
856 
857     #[test]
audio_buffer_copy_to()858     fn audio_buffer_copy_to() {
859         const PERIOD_SIZE: usize = 8192;
860         const NUM_CHANNELS: usize = 6;
861         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
862         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
863         let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
864         let mut aud_buf = AudioBuffer {
865             buffer: &mut src_buf,
866             offset: 0,
867             frame_size: FRAME_SIZE,
868         };
869         aud_buf
870             .copy_to(&mut &mut dst_buf[..])
871             .expect("all data should be copied.");
872         assert_eq!(dst_buf, src_buf);
873     }
874 
875     #[test]
audio_buffer_copy_to_sink()876     fn audio_buffer_copy_to_sink() {
877         const PERIOD_SIZE: usize = 8192;
878         const NUM_CHANNELS: usize = 6;
879         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
880         let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
881         let mut aud_buf = AudioBuffer {
882             buffer: &mut src_buf,
883             offset: 0,
884             frame_size: FRAME_SIZE,
885         };
886         let bytes = aud_buf
887             .copy_to(&mut io::sink())
888             .expect("all data should be copied.");
889         assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
890     }
891 
892     #[test]
io_copy_audio_buffer()893     fn io_copy_audio_buffer() {
894         const PERIOD_SIZE: usize = 8192;
895         const NUM_CHANNELS: usize = 6;
896         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
897         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
898         let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
899         let mut aud_buf = AudioBuffer {
900             buffer: &mut dst_buf,
901             offset: 0,
902             frame_size: FRAME_SIZE,
903         };
904         io::copy(&mut &src_buf[..], &mut aud_buf).expect("all data should be copied.");
905         assert_eq!(dst_buf, src_buf);
906     }
907 
908     #[test]
commit()909     fn commit() {
910         struct TestCommit {
911             frame_count: usize,
912         }
913         impl BufferCommit for TestCommit {
914             fn commit(&mut self, nwritten: usize) {
915                 self.frame_count += nwritten;
916             }
917         }
918         let mut test_commit = TestCommit { frame_count: 0 };
919         {
920             const FRAME_SIZE: usize = 4;
921             let mut buf = [0u8; 480 * FRAME_SIZE];
922             let mut pb_buf = PlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
923             pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
924             pb_buf.commit();
925         }
926         assert_eq!(test_commit.frame_count, 480);
927     }
928 
929     #[test]
sixteen_bit_stereo()930     fn sixteen_bit_stereo() {
931         let mut server = NoopStreamSource::new();
932         let (_, mut stream) = server
933             .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
934             .unwrap();
935         let mut copy_cb = |buf: &mut PlaybackBuffer| {
936             assert_eq!(buf.buffer.frame_capacity(), 480);
937             let pb_buf = [0xa5u8; 480 * 2 * 2];
938             assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
939             Ok(())
940         };
941         stream.write_playback_buffer(&mut copy_cb).unwrap();
942     }
943 
944     #[test]
consumption_rate()945     fn consumption_rate() {
946         let mut server = NoopStreamSource::new();
947         let (_, mut stream) = server
948             .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
949             .unwrap();
950         let start = Instant::now();
951         {
952             let mut copy_cb = |buf: &mut PlaybackBuffer| {
953                 let pb_buf = [0xa5u8; 480 * 2 * 2];
954                 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
955                 Ok(())
956             };
957             stream.write_playback_buffer(&mut copy_cb).unwrap();
958         }
959         // The second call should block until the first buffer is consumed.
960         let mut assert_cb = |_: &mut PlaybackBuffer| {
961             let elapsed = start.elapsed();
962             assert!(
963                 elapsed > Duration::from_millis(10),
964                 "next_playback_buffer didn't block long enough {}",
965                 elapsed.subsec_millis()
966             );
967             Ok(())
968         };
969         stream.write_playback_buffer(&mut assert_cb).unwrap();
970     }
971 
972     #[test]
async_commit()973     fn async_commit() {
974         struct TestCommit {
975             frame_count: usize,
976         }
977         #[async_trait(?Send)]
978         impl AsyncBufferCommit for TestCommit {
979             async fn commit(&mut self, nwritten: usize) {
980                 self.frame_count += nwritten;
981             }
982         }
983         async fn this_test() {
984             let mut test_commit = TestCommit { frame_count: 0 };
985             {
986                 const FRAME_SIZE: usize = 4;
987                 let mut buf = [0u8; 480 * FRAME_SIZE];
988                 let mut pb_buf =
989                     AsyncPlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
990                 pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
991                 pb_buf.commit().await;
992             }
993             assert_eq!(test_commit.frame_count, 480);
994         }
995 
996         this_test().now_or_never();
997     }
998 
999     #[test]
consumption_rate_async()1000     fn consumption_rate_async() {
1001         async fn this_test(ex: &TestExecutor) {
1002             let mut server = NoopStreamSource::new();
1003             let (_, mut stream) = server
1004                 .new_async_playback_stream(2, SampleFormat::S16LE, 48000, 480, ex)
1005                 .unwrap();
1006             let start = Instant::now();
1007             {
1008                 let copy_func = |buf: &mut AsyncPlaybackBuffer| {
1009                     let pb_buf = [0xa5u8; 480 * 2 * 2];
1010                     assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
1011                     Ok(())
1012                 };
1013                 async_write_playback_buffer(&mut *stream, copy_func, ex)
1014                     .await
1015                     .unwrap();
1016             }
1017             // The second call should block until the first buffer is consumed.
1018             let assert_func = |_: &mut AsyncPlaybackBuffer| {
1019                 let elapsed = start.elapsed();
1020                 assert!(
1021                     elapsed > Duration::from_millis(10),
1022                     "write_playback_buffer didn't block long enough {}",
1023                     elapsed.subsec_millis()
1024                 );
1025                 Ok(())
1026             };
1027 
1028             async_write_playback_buffer(&mut *stream, assert_func, ex)
1029                 .await
1030                 .unwrap();
1031         }
1032 
1033         let ex = TestExecutor {};
1034         this_test(&ex).now_or_never();
1035     }
1036 
1037     #[test]
generate_noop_stream_source()1038     fn generate_noop_stream_source() {
1039         let generator: Box<dyn StreamSourceGenerator> = Box::new(NoopStreamSourceGenerator::new());
1040         generator
1041             .generate()
1042             .expect("failed to generate stream source");
1043     }
1044 }
1045