1 // Copyright 2022 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 // Not all video backends make use of the tools in this module. 6 #![allow(dead_code)] 7 8 use std::collections::btree_map::Entry; 9 use std::collections::BTreeMap; 10 use std::collections::VecDeque; 11 use std::time::Duration; 12 13 use base::AsRawDescriptor; 14 use base::Event; 15 use base::EventExt; 16 use sync::Mutex; 17 use thiserror::Error as ThisError; 18 19 use crate::virtio::video::resource::GuestResource; 20 21 /// Manages a pollable queue of events to be sent to the decoder or encoder. 22 pub struct EventQueue<T> { 23 /// Pipe used to signal available events. 24 event: Event, 25 /// FIFO of all pending events. 26 pending_events: VecDeque<T>, 27 } 28 29 impl<T> EventQueue<T> { 30 /// Create a new event queue. new() -> base::Result<Self>31 pub fn new() -> base::Result<Self> { 32 Ok(Self { 33 // Use semaphore semantics so `eventfd` can be `read` as many times as it has been 34 // `write`n to without blocking. 35 event: Event::new()?, 36 pending_events: Default::default(), 37 }) 38 } 39 40 /// Add `event` to the queue. queue_event(&mut self, event: T) -> base::Result<()>41 pub fn queue_event(&mut self, event: T) -> base::Result<()> { 42 self.pending_events.push_back(event); 43 self.event.write_count(1)?; 44 Ok(()) 45 } 46 47 /// Read the next event, blocking until an event becomes available. dequeue_event(&mut self) -> base::Result<T>48 pub fn dequeue_event(&mut self) -> base::Result<T> { 49 // Wait until at least one event is written, if necessary. 50 let cpt = self.event.read_count()?; 51 let event = match self.pending_events.pop_front() { 52 Some(event) => event, 53 None => panic!("event signaled but no pending event - this is a bug."), 54 }; 55 // If we have more than one event pending, write the remainder back into the event so it 56 // keeps signalling. 57 if cpt > 1 { 58 self.event.write_count(cpt - 1)?; 59 } 60 61 Ok(event) 62 } 63 64 /// Remove all the posted events for which `predicate` returns `false`. retain<P: FnMut(&T) -> bool>(&mut self, predicate: P)65 pub fn retain<P: FnMut(&T) -> bool>(&mut self, predicate: P) { 66 if !self.pending_events.is_empty() { 67 let _ = self 68 .event 69 .wait_timeout(Duration::from_millis(0)) 70 .expect("wait_timeout failure"); 71 } 72 73 self.pending_events.retain(predicate); 74 75 let num_pending_events = self.pending_events.len(); 76 if num_pending_events > 0 { 77 self.event 78 .write_count(num_pending_events as u64) 79 .expect("write failure"); 80 } 81 } 82 83 /// Returns the number of events currently pending on this queue, i.e. the number of times 84 /// `dequeue_event` can be called without blocking. 85 #[cfg(test)] len(&self) -> usize86 pub fn len(&self) -> usize { 87 self.pending_events.len() 88 } 89 } 90 91 impl<T> AsRawDescriptor for EventQueue<T> { as_raw_descriptor(&self) -> base::RawDescriptor92 fn as_raw_descriptor(&self) -> base::RawDescriptor { 93 self.event.as_raw_descriptor() 94 } 95 } 96 97 /// An `EventQueue` that is `Sync`, `Send`, and non-mut - i.e. that can easily be passed across 98 /// threads and wrapped into a `Rc` or `Arc`. 99 pub struct SyncEventQueue<T>(Mutex<EventQueue<T>>); 100 101 impl<T> From<EventQueue<T>> for SyncEventQueue<T> { from(queue: EventQueue<T>) -> Self102 fn from(queue: EventQueue<T>) -> Self { 103 Self(Mutex::new(queue)) 104 } 105 } 106 107 impl<T> SyncEventQueue<T> { 108 /// Add `event` to the queue. queue_event(&self, event: T) -> base::Result<()>109 pub fn queue_event(&self, event: T) -> base::Result<()> { 110 self.0.lock().queue_event(event) 111 } 112 113 /// Read the next event, blocking until an event becomes available. dequeue_event(&self) -> base::Result<T>114 pub fn dequeue_event(&self) -> base::Result<T> { 115 self.0.lock().dequeue_event() 116 } 117 118 /// Remove all the posted events for which `predicate` returns `false`. retain<P: FnMut(&T) -> bool>(&self, predicate: P)119 pub fn retain<P: FnMut(&T) -> bool>(&self, predicate: P) { 120 self.0.lock().retain(predicate) 121 } 122 123 /// Returns the number of events currently pending on this queue, i.e. the number of times 124 /// `dequeue_event` can be called without blocking. 125 #[cfg(test)] len(&self) -> usize126 pub fn len(&self) -> usize { 127 self.0.lock().len() 128 } 129 } 130 131 impl<T> AsRawDescriptor for SyncEventQueue<T> { as_raw_descriptor(&self) -> base::RawDescriptor132 fn as_raw_descriptor(&self) -> base::RawDescriptor { 133 self.0.lock().as_raw_descriptor() 134 } 135 } 136 137 /// Queue of all the output buffers provided by crosvm. 138 pub struct OutputQueue { 139 // Max number of output buffers that can be imported into this queue. 140 num_buffers: usize, 141 // Maps picture IDs to the corresponding guest resource. 142 buffers: BTreeMap<u32, GuestResource>, 143 // Picture IDs of output buffers we can write into. 144 ready_buffers: VecDeque<u32>, 145 } 146 147 #[derive(Debug, ThisError)] 148 pub enum OutputBufferImportError { 149 #[error("maximum number of imported buffers ({0}) already reached")] 150 MaxBuffersReached(usize), 151 #[error("a buffer with picture ID {0} is already imported")] 152 AlreadyImported(u32), 153 } 154 155 #[derive(Debug, ThisError)] 156 pub enum OutputBufferReuseError { 157 #[error("no buffer with picture ID {0} is imported at the moment")] 158 NotYetImported(u32), 159 #[error("buffer with picture ID {0} is already ready for use")] 160 AlreadyUsed(u32), 161 } 162 163 impl OutputQueue { 164 /// Creates a new output queue capable of containing `num_buffers` buffers. new(num_buffers: usize) -> Self165 pub fn new(num_buffers: usize) -> Self { 166 Self { 167 num_buffers, 168 buffers: Default::default(), 169 ready_buffers: Default::default(), 170 } 171 } 172 173 /// Import a buffer, i.e. associate the buffer's `resource` to a given `picture_buffer_id`, and 174 /// make the buffer ready for use. 175 /// 176 /// A buffer with a given `picture_buffer_id` can only be imported once. import_buffer( &mut self, picture_buffer_id: u32, resource: GuestResource, ) -> Result<(), OutputBufferImportError>177 pub fn import_buffer( 178 &mut self, 179 picture_buffer_id: u32, 180 resource: GuestResource, 181 ) -> Result<(), OutputBufferImportError> { 182 if self.buffers.len() >= self.num_buffers { 183 return Err(OutputBufferImportError::MaxBuffersReached(self.num_buffers)); 184 } 185 186 match self.buffers.entry(picture_buffer_id) { 187 Entry::Vacant(o) => { 188 o.insert(resource); 189 } 190 Entry::Occupied(_) => { 191 return Err(OutputBufferImportError::AlreadyImported(picture_buffer_id)); 192 } 193 } 194 195 self.ready_buffers.push_back(picture_buffer_id); 196 197 Ok(()) 198 } 199 200 /// Mark the previously-imported buffer with ID `picture_buffer_id` as ready for being used. reuse_buffer(&mut self, picture_buffer_id: u32) -> Result<(), OutputBufferReuseError>201 pub fn reuse_buffer(&mut self, picture_buffer_id: u32) -> Result<(), OutputBufferReuseError> { 202 if !self.buffers.contains_key(&picture_buffer_id) { 203 return Err(OutputBufferReuseError::NotYetImported(picture_buffer_id)); 204 } 205 206 if self.ready_buffers.contains(&picture_buffer_id) { 207 return Err(OutputBufferReuseError::AlreadyUsed(picture_buffer_id)); 208 } 209 210 self.ready_buffers.push_back(picture_buffer_id); 211 212 Ok(()) 213 } 214 215 /// Get a buffer ready to be decoded into, if any is available. try_get_ready_buffer(&mut self) -> Option<(u32, &mut GuestResource)>216 pub fn try_get_ready_buffer(&mut self) -> Option<(u32, &mut GuestResource)> { 217 let picture_buffer_id = self.ready_buffers.pop_front()?; 218 // Unwrapping is safe here because our interface guarantees that ids in `ready_buffers` are 219 // valid keys for `buffers`. 220 Some(( 221 picture_buffer_id, 222 self.buffers 223 .get_mut(&picture_buffer_id) 224 .expect("expected buffer not present in queue"), 225 )) 226 } 227 clear_ready_buffers(&mut self)228 pub fn clear_ready_buffers(&mut self) { 229 self.ready_buffers.clear(); 230 } 231 } 232 233 #[cfg(test)] 234 mod tests { 235 use std::time::Duration; 236 237 use base::EventToken; 238 use base::WaitContext; 239 240 use super::*; 241 use crate::virtio::video::error::VideoError; 242 use crate::virtio::video::error::VideoResult; 243 use crate::virtio::video::format::Rect; 244 245 /// This is the same as DecoderEvent but copied here so that the test can be compiled 246 /// without depending on the "video-decoder" feature. 247 #[derive(Debug)] 248 pub enum TestEvent { 249 #[allow(dead_code)] 250 ProvidePictureBuffers { 251 min_num_buffers: u32, 252 width: i32, 253 height: i32, 254 visible_rect: Rect, 255 }, 256 PictureReady { 257 picture_buffer_id: i32, 258 timestamp: u64, 259 visible_rect: Rect, 260 }, 261 NotifyEndOfBitstreamBuffer(u32), 262 #[allow(dead_code)] 263 NotifyError(VideoError), 264 #[allow(dead_code)] 265 FlushCompleted(VideoResult<()>), 266 #[allow(dead_code)] 267 ResetCompleted(VideoResult<()>), 268 } 269 270 /// Test basic queue/dequeue functionality of `EventQueue`. 271 #[test] event_queue()272 fn event_queue() { 273 let mut event_queue = EventQueue::new().unwrap(); 274 275 assert_eq!( 276 event_queue.queue_event(TestEvent::NotifyEndOfBitstreamBuffer(1)), 277 Ok(()) 278 ); 279 assert_eq!(event_queue.len(), 1); 280 assert_eq!( 281 event_queue.queue_event(TestEvent::PictureReady { 282 picture_buffer_id: 0, 283 timestamp: 42, 284 visible_rect: Rect { 285 left: 0, 286 top: 0, 287 right: 320, 288 bottom: 240, 289 }, 290 }), 291 Ok(()) 292 ); 293 assert_eq!(event_queue.len(), 2); 294 295 assert!(matches!( 296 event_queue.dequeue_event(), 297 Ok(TestEvent::NotifyEndOfBitstreamBuffer(1)) 298 )); 299 assert_eq!(event_queue.len(), 1); 300 assert!(matches!( 301 event_queue.dequeue_event(), 302 Ok(TestEvent::PictureReady { 303 picture_buffer_id: 0, 304 timestamp: 42, 305 visible_rect: Rect { 306 left: 0, 307 top: 0, 308 right: 320, 309 bottom: 240, 310 } 311 }) 312 )); 313 assert_eq!(event_queue.len(), 0); 314 } 315 316 /// Test polling of `TestEventQueue`'s `event_pipe`. 317 #[test] decoder_event_queue_polling()318 fn decoder_event_queue_polling() { 319 #[derive(EventToken)] 320 enum Token { 321 Event, 322 } 323 324 let mut event_queue = EventQueue::new().unwrap(); 325 let wait_context = WaitContext::build_with(&[(&event_queue, Token::Event)]).unwrap(); 326 327 // The queue is empty, so `event_pipe` should not signal. 328 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0); 329 330 // `event_pipe` should signal as long as the queue is not empty. 331 event_queue 332 .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(1)) 333 .unwrap(); 334 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1); 335 event_queue 336 .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(2)) 337 .unwrap(); 338 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1); 339 event_queue 340 .queue_event(TestEvent::NotifyEndOfBitstreamBuffer(3)) 341 .unwrap(); 342 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1); 343 344 event_queue.dequeue_event().unwrap(); 345 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1); 346 event_queue.dequeue_event().unwrap(); 347 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1); 348 event_queue.dequeue_event().unwrap(); 349 350 // The queue is empty again, so `event_pipe` should not signal. 351 assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0); 352 } 353 } 354