1*1b4853f5SAndroid Build Coastguard Worker // Copyright 2024 The ChromiumOS Authors 2*1b4853f5SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be 3*1b4853f5SAndroid Build Coastguard Worker // found in the LICENSE file. 4*1b4853f5SAndroid Build Coastguard Worker 5*1b4853f5SAndroid Build Coastguard Worker use std::{ 6*1b4853f5SAndroid Build Coastguard Worker collections::VecDeque, 7*1b4853f5SAndroid Build Coastguard Worker os::fd::{AsFd, BorrowedFd}, 8*1b4853f5SAndroid Build Coastguard Worker }; 9*1b4853f5SAndroid Build Coastguard Worker 10*1b4853f5SAndroid Build Coastguard Worker use nix::sys::eventfd::EventFd; 11*1b4853f5SAndroid Build Coastguard Worker 12*1b4853f5SAndroid Build Coastguard Worker /// Manages a pollable queue of events to be sent to the decoder or encoder. 13*1b4853f5SAndroid Build Coastguard Worker pub struct EventQueue<T> { 14*1b4853f5SAndroid Build Coastguard Worker /// Pipe used to signal available events. 15*1b4853f5SAndroid Build Coastguard Worker event: EventFd, 16*1b4853f5SAndroid Build Coastguard Worker /// FIFO of all pending events. 17*1b4853f5SAndroid Build Coastguard Worker pending_events: VecDeque<T>, 18*1b4853f5SAndroid Build Coastguard Worker } 19*1b4853f5SAndroid Build Coastguard Worker 20*1b4853f5SAndroid Build Coastguard Worker impl<T> EventQueue<T> { 21*1b4853f5SAndroid Build Coastguard Worker /// Create a new event queue. new() -> nix::Result<Self>22*1b4853f5SAndroid Build Coastguard Worker pub fn new() -> nix::Result<Self> { 23*1b4853f5SAndroid Build Coastguard Worker EventFd::new().map(|event| Self { 24*1b4853f5SAndroid Build Coastguard Worker event, 25*1b4853f5SAndroid Build Coastguard Worker pending_events: Default::default(), 26*1b4853f5SAndroid Build Coastguard Worker }) 27*1b4853f5SAndroid Build Coastguard Worker } 28*1b4853f5SAndroid Build Coastguard Worker 29*1b4853f5SAndroid Build Coastguard Worker /// Add `event` to the queue. 30*1b4853f5SAndroid Build Coastguard Worker /// 31*1b4853f5SAndroid Build Coastguard Worker /// Returns an error if the poll FD could not be signaled. queue_event(&mut self, event: T) -> nix::Result<()>32*1b4853f5SAndroid Build Coastguard Worker pub fn queue_event(&mut self, event: T) -> nix::Result<()> { 33*1b4853f5SAndroid Build Coastguard Worker self.pending_events.push_back(event); 34*1b4853f5SAndroid Build Coastguard Worker if self.pending_events.len() == 1 { 35*1b4853f5SAndroid Build Coastguard Worker let _ = self.event.write(1)?; 36*1b4853f5SAndroid Build Coastguard Worker } 37*1b4853f5SAndroid Build Coastguard Worker 38*1b4853f5SAndroid Build Coastguard Worker Ok(()) 39*1b4853f5SAndroid Build Coastguard Worker } 40*1b4853f5SAndroid Build Coastguard Worker 41*1b4853f5SAndroid Build Coastguard Worker /// Read and return the next event, if any. dequeue_event(&mut self) -> Option<T>42*1b4853f5SAndroid Build Coastguard Worker pub fn dequeue_event(&mut self) -> Option<T> { 43*1b4853f5SAndroid Build Coastguard Worker let event = self.pending_events.pop_front(); 44*1b4853f5SAndroid Build Coastguard Worker 45*1b4853f5SAndroid Build Coastguard Worker if event.is_some() && self.pending_events.is_empty() { 46*1b4853f5SAndroid Build Coastguard Worker let _ = self 47*1b4853f5SAndroid Build Coastguard Worker .event 48*1b4853f5SAndroid Build Coastguard Worker .read() 49*1b4853f5SAndroid Build Coastguard Worker .map_err(|e| log::error!("error while reading event queue fd: {:#}", e)); 50*1b4853f5SAndroid Build Coastguard Worker } 51*1b4853f5SAndroid Build Coastguard Worker 52*1b4853f5SAndroid Build Coastguard Worker event 53*1b4853f5SAndroid Build Coastguard Worker } 54*1b4853f5SAndroid Build Coastguard Worker 55*1b4853f5SAndroid Build Coastguard Worker /// Remove all the posted events for which `predicate` returns `false`. retain<P: FnMut(&T) -> bool>(&mut self, predicate: P)56*1b4853f5SAndroid Build Coastguard Worker pub fn retain<P: FnMut(&T) -> bool>(&mut self, predicate: P) { 57*1b4853f5SAndroid Build Coastguard Worker let was_empty = self.pending_events.is_empty(); 58*1b4853f5SAndroid Build Coastguard Worker 59*1b4853f5SAndroid Build Coastguard Worker self.pending_events.retain(predicate); 60*1b4853f5SAndroid Build Coastguard Worker 61*1b4853f5SAndroid Build Coastguard Worker if !was_empty && self.pending_events.is_empty() { 62*1b4853f5SAndroid Build Coastguard Worker let _ = self 63*1b4853f5SAndroid Build Coastguard Worker .event 64*1b4853f5SAndroid Build Coastguard Worker .read() 65*1b4853f5SAndroid Build Coastguard Worker .map_err(|e| log::error!("error while reading event queue fd: {:#}", e)); 66*1b4853f5SAndroid Build Coastguard Worker } 67*1b4853f5SAndroid Build Coastguard Worker } 68*1b4853f5SAndroid Build Coastguard Worker 69*1b4853f5SAndroid Build Coastguard Worker /// Returns the number of events currently pending on this queue, i.e. the number of times 70*1b4853f5SAndroid Build Coastguard Worker /// `dequeue_event` can be called without blocking. 71*1b4853f5SAndroid Build Coastguard Worker #[cfg(test)] len(&self) -> usize72*1b4853f5SAndroid Build Coastguard Worker pub fn len(&self) -> usize { 73*1b4853f5SAndroid Build Coastguard Worker self.pending_events.len() 74*1b4853f5SAndroid Build Coastguard Worker } 75*1b4853f5SAndroid Build Coastguard Worker } 76*1b4853f5SAndroid Build Coastguard Worker 77*1b4853f5SAndroid Build Coastguard Worker impl<T> AsFd for EventQueue<T> { as_fd(&self) -> BorrowedFd78*1b4853f5SAndroid Build Coastguard Worker fn as_fd(&self) -> BorrowedFd { 79*1b4853f5SAndroid Build Coastguard Worker self.event.as_fd() 80*1b4853f5SAndroid Build Coastguard Worker } 81*1b4853f5SAndroid Build Coastguard Worker } 82*1b4853f5SAndroid Build Coastguard Worker 83*1b4853f5SAndroid Build Coastguard Worker #[cfg(test)] 84*1b4853f5SAndroid Build Coastguard Worker mod tests { 85*1b4853f5SAndroid Build Coastguard Worker use nix::sys::epoll::*; 86*1b4853f5SAndroid Build Coastguard Worker use virtio_media::devices::video_decoder::VideoDecoderBackendEvent; 87*1b4853f5SAndroid Build Coastguard Worker use virtio_media::v4l2r::bindings; 88*1b4853f5SAndroid Build Coastguard Worker 89*1b4853f5SAndroid Build Coastguard Worker use super::*; 90*1b4853f5SAndroid Build Coastguard Worker 91*1b4853f5SAndroid Build Coastguard Worker /// Test basic queue/dequeue functionality of `EventQueue`. 92*1b4853f5SAndroid Build Coastguard Worker #[test] event_queue()93*1b4853f5SAndroid Build Coastguard Worker fn event_queue() { 94*1b4853f5SAndroid Build Coastguard Worker let mut event_queue = EventQueue::new().unwrap(); 95*1b4853f5SAndroid Build Coastguard Worker 96*1b4853f5SAndroid Build Coastguard Worker event_queue 97*1b4853f5SAndroid Build Coastguard Worker .queue_event(VideoDecoderBackendEvent::InputBufferDone(1)) 98*1b4853f5SAndroid Build Coastguard Worker .unwrap(); 99*1b4853f5SAndroid Build Coastguard Worker assert_eq!(event_queue.len(), 1); 100*1b4853f5SAndroid Build Coastguard Worker event_queue 101*1b4853f5SAndroid Build Coastguard Worker .queue_event(VideoDecoderBackendEvent::FrameCompleted { 102*1b4853f5SAndroid Build Coastguard Worker buffer_id: 1, 103*1b4853f5SAndroid Build Coastguard Worker timestamp: bindings::timeval { 104*1b4853f5SAndroid Build Coastguard Worker tv_sec: 10, 105*1b4853f5SAndroid Build Coastguard Worker tv_usec: 42, 106*1b4853f5SAndroid Build Coastguard Worker }, 107*1b4853f5SAndroid Build Coastguard Worker bytes_used: vec![1024], 108*1b4853f5SAndroid Build Coastguard Worker is_last: false, 109*1b4853f5SAndroid Build Coastguard Worker }) 110*1b4853f5SAndroid Build Coastguard Worker .unwrap(); 111*1b4853f5SAndroid Build Coastguard Worker assert_eq!(event_queue.len(), 2); 112*1b4853f5SAndroid Build Coastguard Worker 113*1b4853f5SAndroid Build Coastguard Worker assert!(matches!( 114*1b4853f5SAndroid Build Coastguard Worker event_queue.dequeue_event(), 115*1b4853f5SAndroid Build Coastguard Worker Some(VideoDecoderBackendEvent::InputBufferDone(1)) 116*1b4853f5SAndroid Build Coastguard Worker )); 117*1b4853f5SAndroid Build Coastguard Worker assert_eq!(event_queue.len(), 1); 118*1b4853f5SAndroid Build Coastguard Worker assert_eq!( 119*1b4853f5SAndroid Build Coastguard Worker event_queue.dequeue_event(), 120*1b4853f5SAndroid Build Coastguard Worker Some(VideoDecoderBackendEvent::FrameCompleted { 121*1b4853f5SAndroid Build Coastguard Worker buffer_id: 1, 122*1b4853f5SAndroid Build Coastguard Worker timestamp: bindings::timeval { 123*1b4853f5SAndroid Build Coastguard Worker tv_sec: 10, 124*1b4853f5SAndroid Build Coastguard Worker tv_usec: 42 125*1b4853f5SAndroid Build Coastguard Worker }, 126*1b4853f5SAndroid Build Coastguard Worker bytes_used: vec![1024], 127*1b4853f5SAndroid Build Coastguard Worker is_last: false, 128*1b4853f5SAndroid Build Coastguard Worker }) 129*1b4853f5SAndroid Build Coastguard Worker ); 130*1b4853f5SAndroid Build Coastguard Worker assert_eq!(event_queue.len(), 0); 131*1b4853f5SAndroid Build Coastguard Worker } 132*1b4853f5SAndroid Build Coastguard Worker 133*1b4853f5SAndroid Build Coastguard Worker /// Test polling of `TestEventQueue`'s `event_pipe`. 134*1b4853f5SAndroid Build Coastguard Worker #[test] decoder_event_queue_polling()135*1b4853f5SAndroid Build Coastguard Worker fn decoder_event_queue_polling() { 136*1b4853f5SAndroid Build Coastguard Worker let mut event_queue = EventQueue::new().unwrap(); 137*1b4853f5SAndroid Build Coastguard Worker let epoll = Epoll::new(EpollCreateFlags::empty()).unwrap(); 138*1b4853f5SAndroid Build Coastguard Worker epoll 139*1b4853f5SAndroid Build Coastguard Worker .add(event_queue.as_fd(), EpollEvent::new(EpollFlags::EPOLLIN, 1)) 140*1b4853f5SAndroid Build Coastguard Worker .unwrap(); 141*1b4853f5SAndroid Build Coastguard Worker 142*1b4853f5SAndroid Build Coastguard Worker // The queue is empty, so `event_pipe` should not signal. 143*1b4853f5SAndroid Build Coastguard Worker let mut events = [EpollEvent::empty()]; 144*1b4853f5SAndroid Build Coastguard Worker let nb_fds = epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(); 145*1b4853f5SAndroid Build Coastguard Worker assert_eq!(nb_fds, 0); 146*1b4853f5SAndroid Build Coastguard Worker assert_eq!(event_queue.dequeue_event(), None); 147*1b4853f5SAndroid Build Coastguard Worker 148*1b4853f5SAndroid Build Coastguard Worker // `event_pipe` should signal as long as the queue is not empty. 149*1b4853f5SAndroid Build Coastguard Worker event_queue 150*1b4853f5SAndroid Build Coastguard Worker .queue_event(VideoDecoderBackendEvent::InputBufferDone(1)) 151*1b4853f5SAndroid Build Coastguard Worker .unwrap(); 152*1b4853f5SAndroid Build Coastguard Worker assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1); 153*1b4853f5SAndroid Build Coastguard Worker event_queue 154*1b4853f5SAndroid Build Coastguard Worker .queue_event(VideoDecoderBackendEvent::InputBufferDone(2)) 155*1b4853f5SAndroid Build Coastguard Worker .unwrap(); 156*1b4853f5SAndroid Build Coastguard Worker assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1); 157*1b4853f5SAndroid Build Coastguard Worker event_queue 158*1b4853f5SAndroid Build Coastguard Worker .queue_event(VideoDecoderBackendEvent::InputBufferDone(3)) 159*1b4853f5SAndroid Build Coastguard Worker .unwrap(); 160*1b4853f5SAndroid Build Coastguard Worker assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1); 161*1b4853f5SAndroid Build Coastguard Worker 162*1b4853f5SAndroid Build Coastguard Worker assert_eq!( 163*1b4853f5SAndroid Build Coastguard Worker event_queue.dequeue_event(), 164*1b4853f5SAndroid Build Coastguard Worker Some(VideoDecoderBackendEvent::InputBufferDone(1)) 165*1b4853f5SAndroid Build Coastguard Worker ); 166*1b4853f5SAndroid Build Coastguard Worker assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1); 167*1b4853f5SAndroid Build Coastguard Worker assert_eq!( 168*1b4853f5SAndroid Build Coastguard Worker event_queue.dequeue_event(), 169*1b4853f5SAndroid Build Coastguard Worker Some(VideoDecoderBackendEvent::InputBufferDone(2)) 170*1b4853f5SAndroid Build Coastguard Worker ); 171*1b4853f5SAndroid Build Coastguard Worker assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1); 172*1b4853f5SAndroid Build Coastguard Worker assert_eq!( 173*1b4853f5SAndroid Build Coastguard Worker event_queue.dequeue_event(), 174*1b4853f5SAndroid Build Coastguard Worker Some(VideoDecoderBackendEvent::InputBufferDone(3)) 175*1b4853f5SAndroid Build Coastguard Worker ); 176*1b4853f5SAndroid Build Coastguard Worker 177*1b4853f5SAndroid Build Coastguard Worker // The queue is empty again, so `event_pipe` should not signal. 178*1b4853f5SAndroid Build Coastguard Worker assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 0); 179*1b4853f5SAndroid Build Coastguard Worker assert_eq!(event_queue.dequeue_event(), None); 180*1b4853f5SAndroid Build Coastguard Worker } 181*1b4853f5SAndroid Build Coastguard Worker } 182