1 // Copyright 2019 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! Provides an interface for playing and recording audio.
6 //!
7 //! When implementing an audio playback system, the `StreamSource` trait is implemented.
8 //! Implementors of this trait allow creation of `PlaybackBufferStream` objects. The
9 //! `PlaybackBufferStream` provides the actual audio buffers to be filled with audio samples. These
10 //! buffers can be filled with `write_playback_buffer`.
11 //!
12 //! Users playing audio fill the provided buffers with audio. When a `PlaybackBuffer` is dropped,
13 //! the samples written to it are committed to the `PlaybackBufferStream` it came from.
14 //!
15 //! ```
16 //! use audio_streams::{BoxError, PlaybackBuffer, SampleFormat, StreamSource, NoopStreamSource};
17 //! use std::io::Write;
18 //!
19 //! const buffer_size: usize = 120;
20 //! const num_channels: usize = 2;
21 //!
22 //! # fn main() -> std::result::Result<(), BoxError> {
23 //! let mut stream_source = NoopStreamSource::new();
24 //! let sample_format = SampleFormat::S16LE;
25 //! let frame_size = num_channels * sample_format.sample_bytes();
26 //!
27 //! let (_, mut stream) = stream_source
28 //! .new_playback_stream(num_channels, sample_format, 48000, buffer_size)?;
29 //! // Play 10 buffers of DC.
30 //! let mut buf = Vec::new();
31 //! buf.resize(buffer_size * frame_size, 0xa5u8);
32 //! for _ in 0..10 {
33 //! let mut copy_cb = |stream_buffer: &mut PlaybackBuffer| {
34 //! assert_eq!(stream_buffer.write(&buf)?, buffer_size * frame_size);
35 //! Ok(())
36 //! };
37 //! stream.write_playback_buffer(&mut copy_cb)?;
38 //! }
39 //! # Ok (())
40 //! # }
41 //! ```
42 pub mod async_api;
43
44 use std::cmp::min;
45 use std::error;
46 use std::fmt;
47 use std::fmt::Display;
48 use std::io;
49 use std::io::Read;
50 use std::io::Write;
51 #[cfg(unix)]
52 use std::os::unix::io::RawFd as RawDescriptor;
53 #[cfg(windows)]
54 use std::os::windows::io::RawHandle as RawDescriptor;
55 use std::result::Result;
56 use std::str::FromStr;
57 use std::time::Duration;
58 use std::time::Instant;
59
60 pub use async_api::AsyncStream;
61 pub use async_api::AudioStreamsExecutor;
62 use async_trait::async_trait;
63 use remain::sorted;
64 use serde::Deserialize;
65 use serde::Serialize;
66 use thiserror::Error;
67
68 #[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
69 pub enum SampleFormat {
70 U8,
71 S16LE,
72 S24LE,
73 S32LE,
74 }
75
76 impl SampleFormat {
sample_bytes(self) -> usize77 pub fn sample_bytes(self) -> usize {
78 use SampleFormat::*;
79 match self {
80 U8 => 1,
81 S16LE => 2,
82 S24LE => 4, // Not a typo, S24_LE samples are stored in 4 byte chunks.
83 S32LE => 4,
84 }
85 }
86 }
87
88 impl Display for SampleFormat {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result89 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
90 use SampleFormat::*;
91 match self {
92 U8 => write!(f, "Unsigned 8 bit"),
93 S16LE => write!(f, "Signed 16 bit Little Endian"),
94 S24LE => write!(f, "Signed 24 bit Little Endian"),
95 S32LE => write!(f, "Signed 32 bit Little Endian"),
96 }
97 }
98 }
99
100 impl FromStr for SampleFormat {
101 type Err = SampleFormatError;
from_str(s: &str) -> std::result::Result<Self, Self::Err>102 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
103 match s {
104 "U8" => Ok(SampleFormat::U8),
105 "S16_LE" => Ok(SampleFormat::S16LE),
106 "S24_LE" => Ok(SampleFormat::S24LE),
107 "S32_LE" => Ok(SampleFormat::S32LE),
108 _ => Err(SampleFormatError::InvalidSampleFormat),
109 }
110 }
111 }
112
113 /// Errors that are possible from a `SampleFormat`.
114 #[sorted]
115 #[derive(Error, Debug)]
116 pub enum SampleFormatError {
117 #[error("Must be in [U8, S16_LE, S24_LE, S32_LE]")]
118 InvalidSampleFormat,
119 }
120
121 /// Valid directions of an audio stream.
122 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
123 pub enum StreamDirection {
124 Playback,
125 Capture,
126 }
127
128 /// Valid effects for an audio stream.
129 #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
130 pub enum StreamEffect {
131 #[default]
132 NoEffect,
133 #[serde(alias = "aec")]
134 EchoCancellation,
135 }
136
137 pub mod capture;
138 pub mod shm_streams;
139
140 /// Errors that can pass across threads.
141 pub type BoxError = Box<dyn error::Error + Send + Sync>;
142
143 /// Errors that are possible from a `StreamEffect`.
144 #[sorted]
145 #[derive(Error, Debug)]
146 pub enum StreamEffectError {
147 #[error("Must be in [EchoCancellation, aec]")]
148 InvalidEffect,
149 }
150
151 impl FromStr for StreamEffect {
152 type Err = StreamEffectError;
from_str(s: &str) -> std::result::Result<Self, Self::Err>153 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
154 match s {
155 "EchoCancellation" | "aec" => Ok(StreamEffect::EchoCancellation),
156 _ => Err(StreamEffectError::InvalidEffect),
157 }
158 }
159 }
160
161 #[sorted]
162 #[derive(Error, Debug)]
163 pub enum Error {
164 #[error("Unimplemented")]
165 Unimplemented,
166 }
167
168 /// `StreamSourceGenerator` is a trait used to abstract types that create [`StreamSource`].
169 /// It can be used when multiple types of `StreamSource` are needed.
170 pub trait StreamSourceGenerator: Sync + Send {
generate(&self) -> Result<Box<dyn StreamSource>, BoxError>171 fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError>;
172 }
173
174 /// `StreamSource` creates streams for playback or capture of audio.
175 #[async_trait(?Send)]
176 pub trait StreamSource: Send {
177 /// Returns a stream control and buffer generator object. These are separate as the buffer
178 /// generator might want to be passed to the audio stream.
179 #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>180 fn new_playback_stream(
181 &mut self,
182 num_channels: usize,
183 format: SampleFormat,
184 frame_rate: u32,
185 buffer_size: usize,
186 ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>;
187
188 /// Returns a stream control and async buffer generator object. These are separate as the buffer
189 /// generator might want to be passed to the audio stream.
190 #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, _num_channels: usize, _format: SampleFormat, _frame_rate: u32, _buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>191 fn new_async_playback_stream(
192 &mut self,
193 _num_channels: usize,
194 _format: SampleFormat,
195 _frame_rate: u32,
196 _buffer_size: usize,
197 _ex: &dyn AudioStreamsExecutor,
198 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
199 Err(Box::new(Error::Unimplemented))
200 }
201
202 /// Returns a stream control and async buffer generator object asynchronously.
203 /// Default implementation calls and blocks on `new_async_playback_stream()`.
204 #[allow(clippy::type_complexity)]
async_new_async_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>205 async fn async_new_async_playback_stream(
206 &mut self,
207 num_channels: usize,
208 format: SampleFormat,
209 frame_rate: u32,
210 buffer_size: usize,
211 ex: &dyn AudioStreamsExecutor,
212 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
213 self.new_async_playback_stream(num_channels, format, frame_rate, buffer_size, ex)
214 }
215
216 /// Returns a stream control and buffer generator object. These are separate as the buffer
217 /// generator might want to be passed to the audio stream.
218 /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
219 #[allow(clippy::type_complexity)]
new_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::CaptureBufferStream>, ), BoxError, >220 fn new_capture_stream(
221 &mut self,
222 num_channels: usize,
223 format: SampleFormat,
224 frame_rate: u32,
225 buffer_size: usize,
226 _effects: &[StreamEffect],
227 ) -> Result<
228 (
229 Box<dyn StreamControl>,
230 Box<dyn capture::CaptureBufferStream>,
231 ),
232 BoxError,
233 > {
234 Ok((
235 Box::new(NoopStreamControl::new()),
236 Box::new(capture::NoopCaptureStream::new(
237 num_channels,
238 format,
239 frame_rate,
240 buffer_size,
241 )),
242 ))
243 }
244
245 /// Returns a stream control and async buffer generator object. These are separate as the buffer
246 /// generator might want to be passed to the audio stream.
247 /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
248 #[allow(clippy::type_complexity)]
new_async_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _ex: &dyn AudioStreamsExecutor, ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::AsyncCaptureBufferStream>, ), BoxError, >249 fn new_async_capture_stream(
250 &mut self,
251 num_channels: usize,
252 format: SampleFormat,
253 frame_rate: u32,
254 buffer_size: usize,
255 _effects: &[StreamEffect],
256 _ex: &dyn AudioStreamsExecutor,
257 ) -> Result<
258 (
259 Box<dyn StreamControl>,
260 Box<dyn capture::AsyncCaptureBufferStream>,
261 ),
262 BoxError,
263 > {
264 Ok((
265 Box::new(NoopStreamControl::new()),
266 Box::new(capture::NoopCaptureStream::new(
267 num_channels,
268 format,
269 frame_rate,
270 buffer_size,
271 )),
272 ))
273 }
274
275 /// Returns a stream control and async buffer generator object asynchronously.
276 /// Default implementation calls and blocks on `new_async_capture_stream()`.
277 #[allow(clippy::type_complexity)]
async_new_async_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, effects: &[StreamEffect], ex: &dyn AudioStreamsExecutor, ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::AsyncCaptureBufferStream>, ), BoxError, >278 async fn async_new_async_capture_stream(
279 &mut self,
280 num_channels: usize,
281 format: SampleFormat,
282 frame_rate: u32,
283 buffer_size: usize,
284 effects: &[StreamEffect],
285 ex: &dyn AudioStreamsExecutor,
286 ) -> Result<
287 (
288 Box<dyn StreamControl>,
289 Box<dyn capture::AsyncCaptureBufferStream>,
290 ),
291 BoxError,
292 > {
293 self.new_async_capture_stream(num_channels, format, frame_rate, buffer_size, effects, ex)
294 }
295
296 /// Returns any open file descriptors needed by the implementor. The FD list helps users of the
297 /// StreamSource enter Linux jails making sure not to close needed FDs.
keep_rds(&self) -> Option<Vec<RawDescriptor>>298 fn keep_rds(&self) -> Option<Vec<RawDescriptor>> {
299 None
300 }
301 }
302
303 /// `PlaybackBufferStream` provides `PlaybackBuffer`s to fill with audio samples for playback.
304 pub trait PlaybackBufferStream: Send {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>305 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>;
306
307 /// Call `f` with a `PlaybackBuffer`, and trigger the buffer done call back after. `f` should
308 /// write playback data to the given `PlaybackBuffer`.
write_playback_buffer<'b, 's: 'b>( &'s mut self, f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>, ) -> Result<(), BoxError>309 fn write_playback_buffer<'b, 's: 'b>(
310 &'s mut self,
311 f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>,
312 ) -> Result<(), BoxError> {
313 let mut buf = self.next_playback_buffer()?;
314 f(&mut buf)?;
315 buf.commit();
316 Ok(())
317 }
318 }
319
320 impl<S: PlaybackBufferStream + ?Sized> PlaybackBufferStream for &mut S {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>321 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
322 (**self).next_playback_buffer()
323 }
324 }
325
326 /// `PlaybackBufferStream` provides `PlaybackBuffer`s asynchronously to fill with audio samples for
327 /// playback.
328 #[async_trait(?Send)]
329 pub trait AsyncPlaybackBufferStream: Send {
next_playback_buffer<'a>( &'a mut self, _ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>330 async fn next_playback_buffer<'a>(
331 &'a mut self,
332 _ex: &dyn AudioStreamsExecutor,
333 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>;
334 }
335
336 #[async_trait(?Send)]
337 impl<S: AsyncPlaybackBufferStream + ?Sized> AsyncPlaybackBufferStream for &mut S {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>338 async fn next_playback_buffer<'a>(
339 &'a mut self,
340 ex: &dyn AudioStreamsExecutor,
341 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
342 (**self).next_playback_buffer(ex).await
343 }
344 }
345
346 /// Call `f` with a `AsyncPlaybackBuffer`, and trigger the buffer done call back after. `f` should
347 /// write playback data to the given `AsyncPlaybackBuffer`.
348 ///
349 /// This cannot be a trait method because trait methods with generic parameters are not object safe.
async_write_playback_buffer<F>( stream: &mut dyn AsyncPlaybackBufferStream, f: F, ex: &dyn AudioStreamsExecutor, ) -> Result<(), BoxError> where F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,350 pub async fn async_write_playback_buffer<F>(
351 stream: &mut dyn AsyncPlaybackBufferStream,
352 f: F,
353 ex: &dyn AudioStreamsExecutor,
354 ) -> Result<(), BoxError>
355 where
356 F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,
357 {
358 let mut buf = stream.next_playback_buffer(ex).await?;
359 f(&mut buf)?;
360 buf.commit().await;
361 Ok(())
362 }
363
364 /// `StreamControl` provides a way to set the volume and mute states of a stream. `StreamControl`
365 /// is separate from the stream so it can be owned by a different thread if needed.
366 pub trait StreamControl: Send + Sync {
set_volume(&mut self, _scaler: f64)367 fn set_volume(&mut self, _scaler: f64) {}
set_mute(&mut self, _mute: bool)368 fn set_mute(&mut self, _mute: bool) {}
369 }
370
371 /// `BufferCommit` is a cleanup funcion that must be called before dropping the buffer,
372 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
373 pub trait BufferCommit {
374 /// `write_playback_buffer` or `read_capture_buffer` would trigger this automatically. `nframes`
375 /// indicates the number of audio frames that were read or written to the device.
commit(&mut self, nframes: usize)376 fn commit(&mut self, nframes: usize);
377 /// `latency_bytes` the current device latency.
378 /// For playback it means how many bytes need to be consumed
379 /// before the current playback buffer will be played.
380 /// For capture it means the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32381 fn latency_bytes(&self) -> u32 {
382 0
383 }
384 }
385
386 /// `AsyncBufferCommit` is a cleanup funcion that must be called before dropping the buffer,
387 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
388 #[async_trait(?Send)]
389 pub trait AsyncBufferCommit {
390 /// `async_write_playback_buffer` or `async_read_capture_buffer` would trigger this
391 /// automatically. `nframes` indicates the number of audio frames that were read or written to
392 /// the device.
commit(&mut self, nframes: usize)393 async fn commit(&mut self, nframes: usize);
394 /// `latency_bytes` the current device latency.
395 /// For playback it means how many bytes need to be consumed
396 /// before the current playback buffer will be played.
397 /// For capture it means the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32398 fn latency_bytes(&self) -> u32 {
399 0
400 }
401 }
402
403 /// Errors that are possible from a `PlaybackBuffer`.
404 #[sorted]
405 #[derive(Error, Debug)]
406 pub enum PlaybackBufferError {
407 #[error("Invalid buffer length")]
408 InvalidLength,
409 #[error("Slicing of playback buffer out of bounds")]
410 SliceOutOfBounds,
411 }
412
413 /// `AudioBuffer` is one buffer that holds buffer_size audio frames.
414 /// It is the inner data of `PlaybackBuffer` and `CaptureBuffer`.
415 struct AudioBuffer<'a> {
416 buffer: &'a mut [u8],
417 offset: usize, // Read or Write offset in frames.
418 frame_size: usize, // Size of a frame in bytes.
419 }
420
421 impl<'a> AudioBuffer<'a> {
422 /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize423 pub fn frame_capacity(&self) -> usize {
424 self.buffer.len() / self.frame_size
425 }
426
calc_len(&self, size: usize) -> usize427 fn calc_len(&self, size: usize) -> usize {
428 min(
429 size / self.frame_size * self.frame_size,
430 self.buffer.len() - self.offset,
431 )
432 }
433
434 /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>435 pub fn write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
436 // only write complete frames.
437 let len = self.calc_len(size);
438 cb(&mut self.buffer[self.offset..(self.offset + len)]);
439 self.offset += len;
440 Ok(len)
441 }
442
443 /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>444 pub fn read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
445 let len = self.calc_len(size);
446 cb(&self.buffer[self.offset..(self.offset + len)]);
447 self.offset += len;
448 Ok(len)
449 }
450
451 /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>452 pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
453 let bytes = reader.read(&mut self.buffer[self.offset..])?;
454 self.offset += bytes;
455 Ok(bytes)
456 }
457
458 /// Copy data to an io::Write
copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize>459 pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
460 let bytes = writer.write(&self.buffer[self.offset..])?;
461 self.offset += bytes;
462 Ok(bytes)
463 }
464 }
465
466 impl<'a> Write for AudioBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>467 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
468 let written = (&mut self.buffer[self.offset..]).write(&buf[..buf.len()])?;
469 self.offset += written;
470 Ok(written)
471 }
472
flush(&mut self) -> io::Result<()>473 fn flush(&mut self) -> io::Result<()> {
474 Ok(())
475 }
476 }
477
478 impl<'a> Read for AudioBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>479 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
480 let len = buf.len() / self.frame_size * self.frame_size;
481 let written = (&mut buf[..len]).write(&self.buffer[self.offset..])?;
482 self.offset += written;
483 Ok(written)
484 }
485 }
486
487 /// `PlaybackBuffer` is one buffer that holds buffer_size audio frames. It is used to temporarily
488 /// allow access to an audio buffer and notifes the owning stream of write completion when dropped.
489 pub struct PlaybackBuffer<'a> {
490 buffer: AudioBuffer<'a>,
491 drop: &'a mut dyn BufferCommit,
492 }
493
494 impl<'a> PlaybackBuffer<'a> {
495 /// Creates a new `PlaybackBuffer` that holds a reference to the backing memory specified in
496 /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], drop: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: BufferCommit,497 pub fn new<F>(
498 frame_size: usize,
499 buffer: &'a mut [u8],
500 drop: &'a mut F,
501 ) -> Result<Self, PlaybackBufferError>
502 where
503 F: BufferCommit,
504 {
505 if buffer.len() % frame_size != 0 {
506 return Err(PlaybackBufferError::InvalidLength);
507 }
508
509 Ok(PlaybackBuffer {
510 buffer: AudioBuffer {
511 buffer,
512 offset: 0,
513 frame_size,
514 },
515 drop,
516 })
517 }
518
519 /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize520 pub fn frame_capacity(&self) -> usize {
521 self.buffer.frame_capacity()
522 }
523
524 /// This triggers the commit of `BufferCommit`. This should be called after the data is copied
525 /// to the buffer.
commit(&mut self)526 pub fn commit(&mut self) {
527 self.drop
528 .commit(self.buffer.offset / self.buffer.frame_size);
529 }
530
531 /// It returns how many bytes need to be consumed
532 /// before the current playback buffer will be played.
latency_bytes(&self) -> u32533 pub fn latency_bytes(&self) -> u32 {
534 self.drop.latency_bytes()
535 }
536
537 /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>538 pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
539 self.buffer.write_copy_cb(size, cb)
540 }
541 }
542
543 impl<'a> Write for PlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>544 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
545 self.buffer.write(buf)
546 }
547
flush(&mut self) -> io::Result<()>548 fn flush(&mut self) -> io::Result<()> {
549 self.buffer.flush()
550 }
551 }
552
553 /// `AsyncPlaybackBuffer` is the async version of `PlaybackBuffer`.
554 pub struct AsyncPlaybackBuffer<'a> {
555 buffer: AudioBuffer<'a>,
556 trigger: &'a mut dyn AsyncBufferCommit,
557 }
558
559 impl<'a> AsyncPlaybackBuffer<'a> {
560 /// Creates a new `AsyncPlaybackBuffer` that holds a reference to the backing memory specified
561 /// in `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], trigger: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: AsyncBufferCommit,562 pub fn new<F>(
563 frame_size: usize,
564 buffer: &'a mut [u8],
565 trigger: &'a mut F,
566 ) -> Result<Self, PlaybackBufferError>
567 where
568 F: AsyncBufferCommit,
569 {
570 if buffer.len() % frame_size != 0 {
571 return Err(PlaybackBufferError::InvalidLength);
572 }
573
574 Ok(AsyncPlaybackBuffer {
575 buffer: AudioBuffer {
576 buffer,
577 offset: 0,
578 frame_size,
579 },
580 trigger,
581 })
582 }
583
584 /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize585 pub fn frame_capacity(&self) -> usize {
586 self.buffer.frame_capacity()
587 }
588
589 /// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
590 /// copied to the buffer.
commit(&mut self)591 pub async fn commit(&mut self) {
592 self.trigger
593 .commit(self.buffer.offset / self.buffer.frame_size)
594 .await;
595 }
596
597 /// It returns the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32598 pub fn latency_bytes(&self) -> u32 {
599 self.trigger.latency_bytes()
600 }
601
602 /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>603 pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
604 self.buffer.write_copy_cb(size, cb)
605 }
606
607 /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>608 pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
609 self.buffer.copy_from(reader)
610 }
611 }
612
613 impl<'a> Write for AsyncPlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>614 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
615 self.buffer.write(buf)
616 }
617
flush(&mut self) -> io::Result<()>618 fn flush(&mut self) -> io::Result<()> {
619 self.buffer.flush()
620 }
621 }
622 /// Stream that accepts playback samples but drops them.
623 pub struct NoopStream {
624 buffer: Vec<u8>,
625 frame_size: usize,
626 interval: Duration,
627 next_frame: Duration,
628 start_time: Option<Instant>,
629 buffer_drop: NoopBufferCommit,
630 }
631
632 /// NoopStream data that is needed from the buffer complete callback.
633 struct NoopBufferCommit {
634 which_buffer: bool,
635 }
636
637 impl BufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)638 fn commit(&mut self, _nwritten: usize) {
639 // When a buffer completes, switch to the other one.
640 self.which_buffer ^= true;
641 }
642 }
643
644 #[async_trait(?Send)]
645 impl AsyncBufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)646 async fn commit(&mut self, _nwritten: usize) {
647 // When a buffer completes, switch to the other one.
648 self.which_buffer ^= true;
649 }
650 }
651
652 impl NoopStream {
new( num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Self653 pub fn new(
654 num_channels: usize,
655 format: SampleFormat,
656 frame_rate: u32,
657 buffer_size: usize,
658 ) -> Self {
659 let frame_size = format.sample_bytes() * num_channels;
660 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
661 NoopStream {
662 buffer: vec![0; buffer_size * frame_size],
663 frame_size,
664 interval,
665 next_frame: interval,
666 start_time: None,
667 buffer_drop: NoopBufferCommit {
668 which_buffer: false,
669 },
670 }
671 }
672 }
673
674 impl PlaybackBufferStream for NoopStream {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>675 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
676 if let Some(start_time) = self.start_time {
677 let elapsed = start_time.elapsed();
678 if elapsed < self.next_frame {
679 std::thread::sleep(self.next_frame - elapsed);
680 }
681 self.next_frame += self.interval;
682 } else {
683 self.start_time = Some(Instant::now());
684 self.next_frame = self.interval;
685 }
686 Ok(PlaybackBuffer::new(
687 self.frame_size,
688 &mut self.buffer,
689 &mut self.buffer_drop,
690 )?)
691 }
692 }
693
694 #[async_trait(?Send)]
695 impl AsyncPlaybackBufferStream for NoopStream {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>696 async fn next_playback_buffer<'a>(
697 &'a mut self,
698 ex: &dyn AudioStreamsExecutor,
699 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
700 if let Some(start_time) = self.start_time {
701 let elapsed = start_time.elapsed();
702 if elapsed < self.next_frame {
703 ex.delay(self.next_frame - elapsed).await?;
704 }
705 self.next_frame += self.interval;
706 } else {
707 self.start_time = Some(Instant::now());
708 self.next_frame = self.interval;
709 }
710 Ok(AsyncPlaybackBuffer::new(
711 self.frame_size,
712 &mut self.buffer,
713 &mut self.buffer_drop,
714 )?)
715 }
716 }
717
718 /// No-op control for `NoopStream`s.
719 #[derive(Default)]
720 pub struct NoopStreamControl;
721
722 impl NoopStreamControl {
new() -> Self723 pub fn new() -> Self {
724 NoopStreamControl {}
725 }
726 }
727
728 impl StreamControl for NoopStreamControl {}
729
730 /// Source of `NoopStream` and `NoopStreamControl` objects.
731 #[derive(Default)]
732 pub struct NoopStreamSource;
733
734 impl NoopStreamSource {
new() -> Self735 pub fn new() -> Self {
736 NoopStreamSource {}
737 }
738 }
739
740 impl StreamSource for NoopStreamSource {
741 #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>742 fn new_playback_stream(
743 &mut self,
744 num_channels: usize,
745 format: SampleFormat,
746 frame_rate: u32,
747 buffer_size: usize,
748 ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError> {
749 Ok((
750 Box::new(NoopStreamControl::new()),
751 Box::new(NoopStream::new(
752 num_channels,
753 format,
754 frame_rate,
755 buffer_size,
756 )),
757 ))
758 }
759
760 #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>761 fn new_async_playback_stream(
762 &mut self,
763 num_channels: usize,
764 format: SampleFormat,
765 frame_rate: u32,
766 buffer_size: usize,
767 _ex: &dyn AudioStreamsExecutor,
768 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
769 Ok((
770 Box::new(NoopStreamControl::new()),
771 Box::new(NoopStream::new(
772 num_channels,
773 format,
774 frame_rate,
775 buffer_size,
776 )),
777 ))
778 }
779 }
780
781 /// `NoopStreamSourceGenerator` is a struct that implements [`StreamSourceGenerator`]
782 /// to generate [`NoopStreamSource`].
783 pub struct NoopStreamSourceGenerator;
784
785 impl NoopStreamSourceGenerator {
new() -> Self786 pub fn new() -> Self {
787 NoopStreamSourceGenerator {}
788 }
789 }
790
791 impl Default for NoopStreamSourceGenerator {
default() -> Self792 fn default() -> Self {
793 Self::new()
794 }
795 }
796
797 impl StreamSourceGenerator for NoopStreamSourceGenerator {
generate(&self) -> Result<Box<dyn StreamSource>, BoxError>798 fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError> {
799 Ok(Box::new(NoopStreamSource))
800 }
801 }
802
803 #[cfg(test)]
804 mod tests {
805 use futures::FutureExt;
806 use io::Write;
807
808 use super::async_api::test::TestExecutor;
809 use super::*;
810
811 #[test]
invalid_buffer_length()812 fn invalid_buffer_length() {
813 // Playback buffers can't be created with a size that isn't divisible by the frame size.
814 let mut pb_buf = [0xa5u8; 480 * 2 * 2 + 1];
815 let mut buffer_drop = NoopBufferCommit {
816 which_buffer: false,
817 };
818 assert!(PlaybackBuffer::new(2, &mut pb_buf, &mut buffer_drop).is_err());
819 }
820
821 #[test]
audio_buffer_copy_from()822 fn audio_buffer_copy_from() {
823 const PERIOD_SIZE: usize = 8192;
824 const NUM_CHANNELS: usize = 6;
825 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
826 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
827 let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
828 let mut aud_buf = AudioBuffer {
829 buffer: &mut dst_buf,
830 offset: 0,
831 frame_size: FRAME_SIZE,
832 };
833 aud_buf
834 .copy_from(&mut &src_buf[..])
835 .expect("all data should be copied.");
836 assert_eq!(dst_buf, src_buf);
837 }
838
839 #[test]
audio_buffer_copy_from_repeat()840 fn audio_buffer_copy_from_repeat() {
841 const PERIOD_SIZE: usize = 8192;
842 const NUM_CHANNELS: usize = 6;
843 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
844 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
845 let mut aud_buf = AudioBuffer {
846 buffer: &mut dst_buf,
847 offset: 0,
848 frame_size: FRAME_SIZE,
849 };
850 let bytes = aud_buf
851 .copy_from(&mut io::repeat(1))
852 .expect("all data should be copied.");
853 assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
854 assert_eq!(dst_buf, [1u8; PERIOD_SIZE * FRAME_SIZE]);
855 }
856
857 #[test]
audio_buffer_copy_to()858 fn audio_buffer_copy_to() {
859 const PERIOD_SIZE: usize = 8192;
860 const NUM_CHANNELS: usize = 6;
861 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
862 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
863 let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
864 let mut aud_buf = AudioBuffer {
865 buffer: &mut src_buf,
866 offset: 0,
867 frame_size: FRAME_SIZE,
868 };
869 aud_buf
870 .copy_to(&mut &mut dst_buf[..])
871 .expect("all data should be copied.");
872 assert_eq!(dst_buf, src_buf);
873 }
874
875 #[test]
audio_buffer_copy_to_sink()876 fn audio_buffer_copy_to_sink() {
877 const PERIOD_SIZE: usize = 8192;
878 const NUM_CHANNELS: usize = 6;
879 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
880 let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
881 let mut aud_buf = AudioBuffer {
882 buffer: &mut src_buf,
883 offset: 0,
884 frame_size: FRAME_SIZE,
885 };
886 let bytes = aud_buf
887 .copy_to(&mut io::sink())
888 .expect("all data should be copied.");
889 assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
890 }
891
892 #[test]
io_copy_audio_buffer()893 fn io_copy_audio_buffer() {
894 const PERIOD_SIZE: usize = 8192;
895 const NUM_CHANNELS: usize = 6;
896 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
897 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
898 let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
899 let mut aud_buf = AudioBuffer {
900 buffer: &mut dst_buf,
901 offset: 0,
902 frame_size: FRAME_SIZE,
903 };
904 io::copy(&mut &src_buf[..], &mut aud_buf).expect("all data should be copied.");
905 assert_eq!(dst_buf, src_buf);
906 }
907
908 #[test]
commit()909 fn commit() {
910 struct TestCommit {
911 frame_count: usize,
912 }
913 impl BufferCommit for TestCommit {
914 fn commit(&mut self, nwritten: usize) {
915 self.frame_count += nwritten;
916 }
917 }
918 let mut test_commit = TestCommit { frame_count: 0 };
919 {
920 const FRAME_SIZE: usize = 4;
921 let mut buf = [0u8; 480 * FRAME_SIZE];
922 let mut pb_buf = PlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
923 pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
924 pb_buf.commit();
925 }
926 assert_eq!(test_commit.frame_count, 480);
927 }
928
929 #[test]
sixteen_bit_stereo()930 fn sixteen_bit_stereo() {
931 let mut server = NoopStreamSource::new();
932 let (_, mut stream) = server
933 .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
934 .unwrap();
935 let mut copy_cb = |buf: &mut PlaybackBuffer| {
936 assert_eq!(buf.buffer.frame_capacity(), 480);
937 let pb_buf = [0xa5u8; 480 * 2 * 2];
938 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
939 Ok(())
940 };
941 stream.write_playback_buffer(&mut copy_cb).unwrap();
942 }
943
944 #[test]
consumption_rate()945 fn consumption_rate() {
946 let mut server = NoopStreamSource::new();
947 let (_, mut stream) = server
948 .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
949 .unwrap();
950 let start = Instant::now();
951 {
952 let mut copy_cb = |buf: &mut PlaybackBuffer| {
953 let pb_buf = [0xa5u8; 480 * 2 * 2];
954 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
955 Ok(())
956 };
957 stream.write_playback_buffer(&mut copy_cb).unwrap();
958 }
959 // The second call should block until the first buffer is consumed.
960 let mut assert_cb = |_: &mut PlaybackBuffer| {
961 let elapsed = start.elapsed();
962 assert!(
963 elapsed > Duration::from_millis(10),
964 "next_playback_buffer didn't block long enough {}",
965 elapsed.subsec_millis()
966 );
967 Ok(())
968 };
969 stream.write_playback_buffer(&mut assert_cb).unwrap();
970 }
971
972 #[test]
async_commit()973 fn async_commit() {
974 struct TestCommit {
975 frame_count: usize,
976 }
977 #[async_trait(?Send)]
978 impl AsyncBufferCommit for TestCommit {
979 async fn commit(&mut self, nwritten: usize) {
980 self.frame_count += nwritten;
981 }
982 }
983 async fn this_test() {
984 let mut test_commit = TestCommit { frame_count: 0 };
985 {
986 const FRAME_SIZE: usize = 4;
987 let mut buf = [0u8; 480 * FRAME_SIZE];
988 let mut pb_buf =
989 AsyncPlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
990 pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
991 pb_buf.commit().await;
992 }
993 assert_eq!(test_commit.frame_count, 480);
994 }
995
996 this_test().now_or_never();
997 }
998
999 #[test]
consumption_rate_async()1000 fn consumption_rate_async() {
1001 async fn this_test(ex: &TestExecutor) {
1002 let mut server = NoopStreamSource::new();
1003 let (_, mut stream) = server
1004 .new_async_playback_stream(2, SampleFormat::S16LE, 48000, 480, ex)
1005 .unwrap();
1006 let start = Instant::now();
1007 {
1008 let copy_func = |buf: &mut AsyncPlaybackBuffer| {
1009 let pb_buf = [0xa5u8; 480 * 2 * 2];
1010 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
1011 Ok(())
1012 };
1013 async_write_playback_buffer(&mut *stream, copy_func, ex)
1014 .await
1015 .unwrap();
1016 }
1017 // The second call should block until the first buffer is consumed.
1018 let assert_func = |_: &mut AsyncPlaybackBuffer| {
1019 let elapsed = start.elapsed();
1020 assert!(
1021 elapsed > Duration::from_millis(10),
1022 "write_playback_buffer didn't block long enough {}",
1023 elapsed.subsec_millis()
1024 );
1025 Ok(())
1026 };
1027
1028 async_write_playback_buffer(&mut *stream, assert_func, ex)
1029 .await
1030 .unwrap();
1031 }
1032
1033 let ex = TestExecutor {};
1034 this_test(&ex).now_or_never();
1035 }
1036
1037 #[test]
generate_noop_stream_source()1038 fn generate_noop_stream_source() {
1039 let generator: Box<dyn StreamSourceGenerator> = Box::new(NoopStreamSourceGenerator::new());
1040 generator
1041 .generate()
1042 .expect("failed to generate stream source");
1043 }
1044 }
1045