xref: /aosp_15_r20/external/virtio-media/extras/ffmpeg-decoder/src/event_queue.rs (revision 1b4853f54772485c5dd4001ae33a7a958bcc97a1)
1*1b4853f5SAndroid Build Coastguard Worker // Copyright 2024 The ChromiumOS Authors
2*1b4853f5SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be
3*1b4853f5SAndroid Build Coastguard Worker // found in the LICENSE file.
4*1b4853f5SAndroid Build Coastguard Worker 
5*1b4853f5SAndroid Build Coastguard Worker use std::{
6*1b4853f5SAndroid Build Coastguard Worker     collections::VecDeque,
7*1b4853f5SAndroid Build Coastguard Worker     os::fd::{AsFd, BorrowedFd},
8*1b4853f5SAndroid Build Coastguard Worker };
9*1b4853f5SAndroid Build Coastguard Worker 
10*1b4853f5SAndroid Build Coastguard Worker use nix::sys::eventfd::EventFd;
11*1b4853f5SAndroid Build Coastguard Worker 
12*1b4853f5SAndroid Build Coastguard Worker /// Manages a pollable queue of events to be sent to the decoder or encoder.
13*1b4853f5SAndroid Build Coastguard Worker pub struct EventQueue<T> {
14*1b4853f5SAndroid Build Coastguard Worker     /// Pipe used to signal available events.
15*1b4853f5SAndroid Build Coastguard Worker     event: EventFd,
16*1b4853f5SAndroid Build Coastguard Worker     /// FIFO of all pending events.
17*1b4853f5SAndroid Build Coastguard Worker     pending_events: VecDeque<T>,
18*1b4853f5SAndroid Build Coastguard Worker }
19*1b4853f5SAndroid Build Coastguard Worker 
20*1b4853f5SAndroid Build Coastguard Worker impl<T> EventQueue<T> {
21*1b4853f5SAndroid Build Coastguard Worker     /// Create a new event queue.
new() -> nix::Result<Self>22*1b4853f5SAndroid Build Coastguard Worker     pub fn new() -> nix::Result<Self> {
23*1b4853f5SAndroid Build Coastguard Worker         EventFd::new().map(|event| Self {
24*1b4853f5SAndroid Build Coastguard Worker             event,
25*1b4853f5SAndroid Build Coastguard Worker             pending_events: Default::default(),
26*1b4853f5SAndroid Build Coastguard Worker         })
27*1b4853f5SAndroid Build Coastguard Worker     }
28*1b4853f5SAndroid Build Coastguard Worker 
29*1b4853f5SAndroid Build Coastguard Worker     /// Add `event` to the queue.
30*1b4853f5SAndroid Build Coastguard Worker     ///
31*1b4853f5SAndroid Build Coastguard Worker     /// Returns an error if the poll FD could not be signaled.
queue_event(&mut self, event: T) -> nix::Result<()>32*1b4853f5SAndroid Build Coastguard Worker     pub fn queue_event(&mut self, event: T) -> nix::Result<()> {
33*1b4853f5SAndroid Build Coastguard Worker         self.pending_events.push_back(event);
34*1b4853f5SAndroid Build Coastguard Worker         if self.pending_events.len() == 1 {
35*1b4853f5SAndroid Build Coastguard Worker             let _ = self.event.write(1)?;
36*1b4853f5SAndroid Build Coastguard Worker         }
37*1b4853f5SAndroid Build Coastguard Worker 
38*1b4853f5SAndroid Build Coastguard Worker         Ok(())
39*1b4853f5SAndroid Build Coastguard Worker     }
40*1b4853f5SAndroid Build Coastguard Worker 
41*1b4853f5SAndroid Build Coastguard Worker     /// Read and return the next event, if any.
dequeue_event(&mut self) -> Option<T>42*1b4853f5SAndroid Build Coastguard Worker     pub fn dequeue_event(&mut self) -> Option<T> {
43*1b4853f5SAndroid Build Coastguard Worker         let event = self.pending_events.pop_front();
44*1b4853f5SAndroid Build Coastguard Worker 
45*1b4853f5SAndroid Build Coastguard Worker         if event.is_some() && self.pending_events.is_empty() {
46*1b4853f5SAndroid Build Coastguard Worker             let _ = self
47*1b4853f5SAndroid Build Coastguard Worker                 .event
48*1b4853f5SAndroid Build Coastguard Worker                 .read()
49*1b4853f5SAndroid Build Coastguard Worker                 .map_err(|e| log::error!("error while reading event queue fd: {:#}", e));
50*1b4853f5SAndroid Build Coastguard Worker         }
51*1b4853f5SAndroid Build Coastguard Worker 
52*1b4853f5SAndroid Build Coastguard Worker         event
53*1b4853f5SAndroid Build Coastguard Worker     }
54*1b4853f5SAndroid Build Coastguard Worker 
55*1b4853f5SAndroid Build Coastguard Worker     /// Remove all the posted events for which `predicate` returns `false`.
retain<P: FnMut(&T) -> bool>(&mut self, predicate: P)56*1b4853f5SAndroid Build Coastguard Worker     pub fn retain<P: FnMut(&T) -> bool>(&mut self, predicate: P) {
57*1b4853f5SAndroid Build Coastguard Worker         let was_empty = self.pending_events.is_empty();
58*1b4853f5SAndroid Build Coastguard Worker 
59*1b4853f5SAndroid Build Coastguard Worker         self.pending_events.retain(predicate);
60*1b4853f5SAndroid Build Coastguard Worker 
61*1b4853f5SAndroid Build Coastguard Worker         if !was_empty && self.pending_events.is_empty() {
62*1b4853f5SAndroid Build Coastguard Worker             let _ = self
63*1b4853f5SAndroid Build Coastguard Worker                 .event
64*1b4853f5SAndroid Build Coastguard Worker                 .read()
65*1b4853f5SAndroid Build Coastguard Worker                 .map_err(|e| log::error!("error while reading event queue fd: {:#}", e));
66*1b4853f5SAndroid Build Coastguard Worker         }
67*1b4853f5SAndroid Build Coastguard Worker     }
68*1b4853f5SAndroid Build Coastguard Worker 
69*1b4853f5SAndroid Build Coastguard Worker     /// Returns the number of events currently pending on this queue, i.e. the number of times
70*1b4853f5SAndroid Build Coastguard Worker     /// `dequeue_event` can be called without blocking.
71*1b4853f5SAndroid Build Coastguard Worker     #[cfg(test)]
len(&self) -> usize72*1b4853f5SAndroid Build Coastguard Worker     pub fn len(&self) -> usize {
73*1b4853f5SAndroid Build Coastguard Worker         self.pending_events.len()
74*1b4853f5SAndroid Build Coastguard Worker     }
75*1b4853f5SAndroid Build Coastguard Worker }
76*1b4853f5SAndroid Build Coastguard Worker 
77*1b4853f5SAndroid Build Coastguard Worker impl<T> AsFd for EventQueue<T> {
as_fd(&self) -> BorrowedFd78*1b4853f5SAndroid Build Coastguard Worker     fn as_fd(&self) -> BorrowedFd {
79*1b4853f5SAndroid Build Coastguard Worker         self.event.as_fd()
80*1b4853f5SAndroid Build Coastguard Worker     }
81*1b4853f5SAndroid Build Coastguard Worker }
82*1b4853f5SAndroid Build Coastguard Worker 
83*1b4853f5SAndroid Build Coastguard Worker #[cfg(test)]
84*1b4853f5SAndroid Build Coastguard Worker mod tests {
85*1b4853f5SAndroid Build Coastguard Worker     use nix::sys::epoll::*;
86*1b4853f5SAndroid Build Coastguard Worker     use virtio_media::devices::video_decoder::VideoDecoderBackendEvent;
87*1b4853f5SAndroid Build Coastguard Worker     use virtio_media::v4l2r::bindings;
88*1b4853f5SAndroid Build Coastguard Worker 
89*1b4853f5SAndroid Build Coastguard Worker     use super::*;
90*1b4853f5SAndroid Build Coastguard Worker 
91*1b4853f5SAndroid Build Coastguard Worker     /// Test basic queue/dequeue functionality of `EventQueue`.
92*1b4853f5SAndroid Build Coastguard Worker     #[test]
event_queue()93*1b4853f5SAndroid Build Coastguard Worker     fn event_queue() {
94*1b4853f5SAndroid Build Coastguard Worker         let mut event_queue = EventQueue::new().unwrap();
95*1b4853f5SAndroid Build Coastguard Worker 
96*1b4853f5SAndroid Build Coastguard Worker         event_queue
97*1b4853f5SAndroid Build Coastguard Worker             .queue_event(VideoDecoderBackendEvent::InputBufferDone(1))
98*1b4853f5SAndroid Build Coastguard Worker             .unwrap();
99*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(event_queue.len(), 1);
100*1b4853f5SAndroid Build Coastguard Worker         event_queue
101*1b4853f5SAndroid Build Coastguard Worker             .queue_event(VideoDecoderBackendEvent::FrameCompleted {
102*1b4853f5SAndroid Build Coastguard Worker                 buffer_id: 1,
103*1b4853f5SAndroid Build Coastguard Worker                 timestamp: bindings::timeval {
104*1b4853f5SAndroid Build Coastguard Worker                     tv_sec: 10,
105*1b4853f5SAndroid Build Coastguard Worker                     tv_usec: 42,
106*1b4853f5SAndroid Build Coastguard Worker                 },
107*1b4853f5SAndroid Build Coastguard Worker                 bytes_used: vec![1024],
108*1b4853f5SAndroid Build Coastguard Worker                 is_last: false,
109*1b4853f5SAndroid Build Coastguard Worker             })
110*1b4853f5SAndroid Build Coastguard Worker             .unwrap();
111*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(event_queue.len(), 2);
112*1b4853f5SAndroid Build Coastguard Worker 
113*1b4853f5SAndroid Build Coastguard Worker         assert!(matches!(
114*1b4853f5SAndroid Build Coastguard Worker             event_queue.dequeue_event(),
115*1b4853f5SAndroid Build Coastguard Worker             Some(VideoDecoderBackendEvent::InputBufferDone(1))
116*1b4853f5SAndroid Build Coastguard Worker         ));
117*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(event_queue.len(), 1);
118*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(
119*1b4853f5SAndroid Build Coastguard Worker             event_queue.dequeue_event(),
120*1b4853f5SAndroid Build Coastguard Worker             Some(VideoDecoderBackendEvent::FrameCompleted {
121*1b4853f5SAndroid Build Coastguard Worker                 buffer_id: 1,
122*1b4853f5SAndroid Build Coastguard Worker                 timestamp: bindings::timeval {
123*1b4853f5SAndroid Build Coastguard Worker                     tv_sec: 10,
124*1b4853f5SAndroid Build Coastguard Worker                     tv_usec: 42
125*1b4853f5SAndroid Build Coastguard Worker                 },
126*1b4853f5SAndroid Build Coastguard Worker                 bytes_used: vec![1024],
127*1b4853f5SAndroid Build Coastguard Worker                 is_last: false,
128*1b4853f5SAndroid Build Coastguard Worker             })
129*1b4853f5SAndroid Build Coastguard Worker         );
130*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(event_queue.len(), 0);
131*1b4853f5SAndroid Build Coastguard Worker     }
132*1b4853f5SAndroid Build Coastguard Worker 
133*1b4853f5SAndroid Build Coastguard Worker     /// Test polling of `TestEventQueue`'s `event_pipe`.
134*1b4853f5SAndroid Build Coastguard Worker     #[test]
decoder_event_queue_polling()135*1b4853f5SAndroid Build Coastguard Worker     fn decoder_event_queue_polling() {
136*1b4853f5SAndroid Build Coastguard Worker         let mut event_queue = EventQueue::new().unwrap();
137*1b4853f5SAndroid Build Coastguard Worker         let epoll = Epoll::new(EpollCreateFlags::empty()).unwrap();
138*1b4853f5SAndroid Build Coastguard Worker         epoll
139*1b4853f5SAndroid Build Coastguard Worker             .add(event_queue.as_fd(), EpollEvent::new(EpollFlags::EPOLLIN, 1))
140*1b4853f5SAndroid Build Coastguard Worker             .unwrap();
141*1b4853f5SAndroid Build Coastguard Worker 
142*1b4853f5SAndroid Build Coastguard Worker         // The queue is empty, so `event_pipe` should not signal.
143*1b4853f5SAndroid Build Coastguard Worker         let mut events = [EpollEvent::empty()];
144*1b4853f5SAndroid Build Coastguard Worker         let nb_fds = epoll.wait(&mut events, EpollTimeout::ZERO).unwrap();
145*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(nb_fds, 0);
146*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(event_queue.dequeue_event(), None);
147*1b4853f5SAndroid Build Coastguard Worker 
148*1b4853f5SAndroid Build Coastguard Worker         // `event_pipe` should signal as long as the queue is not empty.
149*1b4853f5SAndroid Build Coastguard Worker         event_queue
150*1b4853f5SAndroid Build Coastguard Worker             .queue_event(VideoDecoderBackendEvent::InputBufferDone(1))
151*1b4853f5SAndroid Build Coastguard Worker             .unwrap();
152*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1);
153*1b4853f5SAndroid Build Coastguard Worker         event_queue
154*1b4853f5SAndroid Build Coastguard Worker             .queue_event(VideoDecoderBackendEvent::InputBufferDone(2))
155*1b4853f5SAndroid Build Coastguard Worker             .unwrap();
156*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1);
157*1b4853f5SAndroid Build Coastguard Worker         event_queue
158*1b4853f5SAndroid Build Coastguard Worker             .queue_event(VideoDecoderBackendEvent::InputBufferDone(3))
159*1b4853f5SAndroid Build Coastguard Worker             .unwrap();
160*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1);
161*1b4853f5SAndroid Build Coastguard Worker 
162*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(
163*1b4853f5SAndroid Build Coastguard Worker             event_queue.dequeue_event(),
164*1b4853f5SAndroid Build Coastguard Worker             Some(VideoDecoderBackendEvent::InputBufferDone(1))
165*1b4853f5SAndroid Build Coastguard Worker         );
166*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1);
167*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(
168*1b4853f5SAndroid Build Coastguard Worker             event_queue.dequeue_event(),
169*1b4853f5SAndroid Build Coastguard Worker             Some(VideoDecoderBackendEvent::InputBufferDone(2))
170*1b4853f5SAndroid Build Coastguard Worker         );
171*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1);
172*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(
173*1b4853f5SAndroid Build Coastguard Worker             event_queue.dequeue_event(),
174*1b4853f5SAndroid Build Coastguard Worker             Some(VideoDecoderBackendEvent::InputBufferDone(3))
175*1b4853f5SAndroid Build Coastguard Worker         );
176*1b4853f5SAndroid Build Coastguard Worker 
177*1b4853f5SAndroid Build Coastguard Worker         // The queue is empty again, so `event_pipe` should not signal.
178*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 0);
179*1b4853f5SAndroid Build Coastguard Worker         assert_eq!(event_queue.dequeue_event(), None);
180*1b4853f5SAndroid Build Coastguard Worker     }
181*1b4853f5SAndroid Build Coastguard Worker }
182