xref: /aosp_15_r20/external/crosvm/devices/src/virtio/video/utils.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // Not all video backends make use of the tools in this module.
6 #![allow(dead_code)]
7 
8 use std::collections::btree_map::Entry;
9 use std::collections::BTreeMap;
10 use std::collections::VecDeque;
11 use std::time::Duration;
12 
13 use base::AsRawDescriptor;
14 use base::Event;
15 use base::EventExt;
16 use sync::Mutex;
17 use thiserror::Error as ThisError;
18 
19 use crate::virtio::video::resource::GuestResource;
20 
21 /// Manages a pollable queue of events to be sent to the decoder or encoder.
22 pub struct EventQueue<T> {
23     /// Pipe used to signal available events.
24     event: Event,
25     /// FIFO of all pending events.
26     pending_events: VecDeque<T>,
27 }
28 
29 impl<T> EventQueue<T> {
30     /// Create a new event queue.
new() -> base::Result<Self>31     pub fn new() -> base::Result<Self> {
32         Ok(Self {
33             // Use semaphore semantics so `eventfd` can be `read` as many times as it has been
34             // `write`n to without blocking.
35             event: Event::new()?,
36             pending_events: Default::default(),
37         })
38     }
39 
40     /// Add `event` to the queue.
queue_event(&mut self, event: T) -> base::Result<()>41     pub fn queue_event(&mut self, event: T) -> base::Result<()> {
42         self.pending_events.push_back(event);
43         self.event.write_count(1)?;
44         Ok(())
45     }
46 
47     /// Read the next event, blocking until an event becomes available.
dequeue_event(&mut self) -> base::Result<T>48     pub fn dequeue_event(&mut self) -> base::Result<T> {
49         // Wait until at least one event is written, if necessary.
50         let cpt = self.event.read_count()?;
51         let event = match self.pending_events.pop_front() {
52             Some(event) => event,
53             None => panic!("event signaled but no pending event - this is a bug."),
54         };
55         // If we have more than one event pending, write the remainder back into the event so it
56         // keeps signalling.
57         if cpt > 1 {
58             self.event.write_count(cpt - 1)?;
59         }
60 
61         Ok(event)
62     }
63 
64     /// Remove all the posted events for which `predicate` returns `false`.
retain<P: FnMut(&T) -> bool>(&mut self, predicate: P)65     pub fn retain<P: FnMut(&T) -> bool>(&mut self, predicate: P) {
66         if !self.pending_events.is_empty() {
67             let _ = self
68                 .event
69                 .wait_timeout(Duration::from_millis(0))
70                 .expect("wait_timeout failure");
71         }
72 
73         self.pending_events.retain(predicate);
74 
75         let num_pending_events = self.pending_events.len();
76         if num_pending_events > 0 {
77             self.event
78                 .write_count(num_pending_events as u64)
79                 .expect("write failure");
80         }
81     }
82 
83     /// Returns the number of events currently pending on this queue, i.e. the number of times
84     /// `dequeue_event` can be called without blocking.
85     #[cfg(test)]
len(&self) -> usize86     pub fn len(&self) -> usize {
87         self.pending_events.len()
88     }
89 }
90 
91 impl<T> AsRawDescriptor for EventQueue<T> {
as_raw_descriptor(&self) -> base::RawDescriptor92     fn as_raw_descriptor(&self) -> base::RawDescriptor {
93         self.event.as_raw_descriptor()
94     }
95 }
96 
97 /// An `EventQueue` that is `Sync`, `Send`, and non-mut - i.e. that can easily be passed across
98 /// threads and wrapped into a `Rc` or `Arc`.
99 pub struct SyncEventQueue<T>(Mutex<EventQueue<T>>);
100 
101 impl<T> From<EventQueue<T>> for SyncEventQueue<T> {
from(queue: EventQueue<T>) -> Self102     fn from(queue: EventQueue<T>) -> Self {
103         Self(Mutex::new(queue))
104     }
105 }
106 
107 impl<T> SyncEventQueue<T> {
108     /// Add `event` to the queue.
queue_event(&self, event: T) -> base::Result<()>109     pub fn queue_event(&self, event: T) -> base::Result<()> {
110         self.0.lock().queue_event(event)
111     }
112 
113     /// Read the next event, blocking until an event becomes available.
dequeue_event(&self) -> base::Result<T>114     pub fn dequeue_event(&self) -> base::Result<T> {
115         self.0.lock().dequeue_event()
116     }
117 
118     /// Remove all the posted events for which `predicate` returns `false`.
retain<P: FnMut(&T) -> bool>(&self, predicate: P)119     pub fn retain<P: FnMut(&T) -> bool>(&self, predicate: P) {
120         self.0.lock().retain(predicate)
121     }
122 
123     /// Returns the number of events currently pending on this queue, i.e. the number of times
124     /// `dequeue_event` can be called without blocking.
125     #[cfg(test)]
len(&self) -> usize126     pub fn len(&self) -> usize {
127         self.0.lock().len()
128     }
129 }
130 
131 impl<T> AsRawDescriptor for SyncEventQueue<T> {
as_raw_descriptor(&self) -> base::RawDescriptor132     fn as_raw_descriptor(&self) -> base::RawDescriptor {
133         self.0.lock().as_raw_descriptor()
134     }
135 }
136 
137 /// Queue of all the output buffers provided by crosvm.
138 pub struct OutputQueue {
139     // Max number of output buffers that can be imported into this queue.
140     num_buffers: usize,
141     // Maps picture IDs to the corresponding guest resource.
142     buffers: BTreeMap<u32, GuestResource>,
143     // Picture IDs of output buffers we can write into.
144     ready_buffers: VecDeque<u32>,
145 }
146 
147 #[derive(Debug, ThisError)]
148 pub enum OutputBufferImportError {
149     #[error("maximum number of imported buffers ({0}) already reached")]
150     MaxBuffersReached(usize),
151     #[error("a buffer with picture ID {0} is already imported")]
152     AlreadyImported(u32),
153 }
154 
155 #[derive(Debug, ThisError)]
156 pub enum OutputBufferReuseError {
157     #[error("no buffer with picture ID {0} is imported at the moment")]
158     NotYetImported(u32),
159     #[error("buffer with picture ID {0} is already ready for use")]
160     AlreadyUsed(u32),
161 }
162 
163 impl OutputQueue {
164     /// Creates a new output queue capable of containing `num_buffers` buffers.
new(num_buffers: usize) -> Self165     pub fn new(num_buffers: usize) -> Self {
166         Self {
167             num_buffers,
168             buffers: Default::default(),
169             ready_buffers: Default::default(),
170         }
171     }
172 
173     /// Import a buffer, i.e. associate the buffer's `resource` to a given `picture_buffer_id`, and
174     /// make the buffer ready for use.
175     ///
176     /// A buffer with a given `picture_buffer_id` can only be imported once.
import_buffer( &mut self, picture_buffer_id: u32, resource: GuestResource, ) -> Result<(), OutputBufferImportError>177     pub fn import_buffer(
178         &mut self,
179         picture_buffer_id: u32,
180         resource: GuestResource,
181     ) -> Result<(), OutputBufferImportError> {
182         if self.buffers.len() >= self.num_buffers {
183             return Err(OutputBufferImportError::MaxBuffersReached(self.num_buffers));
184         }
185 
186         match self.buffers.entry(picture_buffer_id) {
187             Entry::Vacant(o) => {
188                 o.insert(resource);
189             }
190             Entry::Occupied(_) => {
191                 return Err(OutputBufferImportError::AlreadyImported(picture_buffer_id));
192             }
193         }
194 
195         self.ready_buffers.push_back(picture_buffer_id);
196 
197         Ok(())
198     }
199 
200     /// Mark the previously-imported buffer with ID `picture_buffer_id` as ready for being used.
reuse_buffer(&mut self, picture_buffer_id: u32) -> Result<(), OutputBufferReuseError>201     pub fn reuse_buffer(&mut self, picture_buffer_id: u32) -> Result<(), OutputBufferReuseError> {
202         if !self.buffers.contains_key(&picture_buffer_id) {
203             return Err(OutputBufferReuseError::NotYetImported(picture_buffer_id));
204         }
205 
206         if self.ready_buffers.contains(&picture_buffer_id) {
207             return Err(OutputBufferReuseError::AlreadyUsed(picture_buffer_id));
208         }
209 
210         self.ready_buffers.push_back(picture_buffer_id);
211 
212         Ok(())
213     }
214 
215     /// Get a buffer ready to be decoded into, if any is available.
try_get_ready_buffer(&mut self) -> Option<(u32, &mut GuestResource)>216     pub fn try_get_ready_buffer(&mut self) -> Option<(u32, &mut GuestResource)> {
217         let picture_buffer_id = self.ready_buffers.pop_front()?;
218         // Unwrapping is safe here because our interface guarantees that ids in `ready_buffers` are
219         // valid keys for `buffers`.
220         Some((
221             picture_buffer_id,
222             self.buffers
223                 .get_mut(&picture_buffer_id)
224                 .expect("expected buffer not present in queue"),
225         ))
226     }
227 
clear_ready_buffers(&mut self)228     pub fn clear_ready_buffers(&mut self) {
229         self.ready_buffers.clear();
230     }
231 }
232 
233 #[cfg(test)]
234 mod tests {
235     use std::time::Duration;
236 
237     use base::EventToken;
238     use base::WaitContext;
239 
240     use super::*;
241     use crate::virtio::video::error::VideoError;
242     use crate::virtio::video::error::VideoResult;
243     use crate::virtio::video::format::Rect;
244 
245     /// This is the same as DecoderEvent but copied here so that the test can be compiled
246     /// without depending on the "video-decoder" feature.
247     #[derive(Debug)]
248     pub enum TestEvent {
249         #[allow(dead_code)]
250         ProvidePictureBuffers {
251             min_num_buffers: u32,
252             width: i32,
253             height: i32,
254             visible_rect: Rect,
255         },
256         PictureReady {
257             picture_buffer_id: i32,
258             timestamp: u64,
259             visible_rect: Rect,
260         },
261         NotifyEndOfBitstreamBuffer(u32),
262         #[allow(dead_code)]
263         NotifyError(VideoError),
264         #[allow(dead_code)]
265         FlushCompleted(VideoResult<()>),
266         #[allow(dead_code)]
267         ResetCompleted(VideoResult<()>),
268     }
269 
270     /// Test basic queue/dequeue functionality of `EventQueue`.
271     #[test]
event_queue()272     fn event_queue() {
273         let mut event_queue = EventQueue::new().unwrap();
274 
275         assert_eq!(
276             event_queue.queue_event(TestEvent::NotifyEndOfBitstreamBuffer(1)),
277             Ok(())
278         );
279         assert_eq!(event_queue.len(), 1);
280         assert_eq!(
281             event_queue.queue_event(TestEvent::PictureReady {
282                 picture_buffer_id: 0,
283                 timestamp: 42,
284                 visible_rect: Rect {
285                     left: 0,
286                     top: 0,
287                     right: 320,
288                     bottom: 240,
289                 },
290             }),
291             Ok(())
292         );
293         assert_eq!(event_queue.len(), 2);
294 
295         assert!(matches!(
296             event_queue.dequeue_event(),
297             Ok(TestEvent::NotifyEndOfBitstreamBuffer(1))
298         ));
299         assert_eq!(event_queue.len(), 1);
300         assert!(matches!(
301             event_queue.dequeue_event(),
302             Ok(TestEvent::PictureReady {
303                 picture_buffer_id: 0,
304                 timestamp: 42,
305                 visible_rect: Rect {
306                     left: 0,
307                     top: 0,
308                     right: 320,
309                     bottom: 240,
310                 }
311             })
312         ));
313         assert_eq!(event_queue.len(), 0);
314     }
315 
316     /// Test polling of `TestEventQueue`'s `event_pipe`.
317     #[test]
decoder_event_queue_polling()318     fn decoder_event_queue_polling() {
319         #[derive(EventToken)]
320         enum Token {
321             Event,
322         }
323 
324         let mut event_queue = EventQueue::new().unwrap();
325         let wait_context = WaitContext::build_with(&[(&event_queue, Token::Event)]).unwrap();
326 
327         // The queue is empty, so `event_pipe` should not signal.
328         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0);
329 
330         // `event_pipe` should signal as long as the queue is not empty.
331         event_queue
332             .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(1))
333             .unwrap();
334         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
335         event_queue
336             .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(2))
337             .unwrap();
338         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
339         event_queue
340             .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(3))
341             .unwrap();
342         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
343 
344         event_queue.dequeue_event().unwrap();
345         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
346         event_queue.dequeue_event().unwrap();
347         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
348         event_queue.dequeue_event().unwrap();
349 
350         // The queue is empty again, so `event_pipe` should not signal.
351         assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0);
352     }
353 }
354