1 //! A level-triggered `Poller` for V4L2 devices that allows a user to be notified
2 //! when a CAPTURE or OUTPUT buffer is ready to be dequeued, or when a V4L2
3 //! event is ready to be dequeued.
4 //!
5 //! It also provides a `Waker` companion that allows other threads to interrupt
6 //! an ongoing (or coming) poll. Useful to implement an event-based loop.
7 
8 use std::{
9     collections::BTreeMap,
10     convert::TryFrom,
11     io,
12     os::fd::{AsFd, BorrowedFd},
13     sync::{
14         atomic::{AtomicUsize, Ordering},
15         Arc,
16     },
17     task::Wake,
18 };
19 
20 use log::{error, warn};
21 use nix::{
22     errno::Errno,
23     poll::PollTimeout,
24     sys::{
25         epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags},
26         eventfd::{EfdFlags, EventFd},
27     },
28 };
29 use thiserror::Error;
30 
31 use crate::device::Device;
32 
33 #[derive(Debug, PartialEq)]
34 pub enum DeviceEvent {
35     CaptureReady,
36     OutputReady,
37     V4L2Event,
38 }
39 
40 #[derive(Debug, PartialEq)]
41 pub enum PollEvent {
42     Device(DeviceEvent),
43     Waker(u32),
44 }
45 
46 pub struct PollEvents {
47     events: [EpollEvent; 4],
48     nb_events: usize,
49     cur_event: usize,
50 }
51 
52 impl PollEvents {
new() -> Self53     fn new() -> Self {
54         PollEvents {
55             events: [
56                 EpollEvent::empty(),
57                 EpollEvent::empty(),
58                 EpollEvent::empty(),
59                 EpollEvent::empty(),
60             ],
61             nb_events: 0,
62             cur_event: 0,
63         }
64     }
65 }
66 
67 impl Iterator for PollEvents {
68     type Item = PollEvent;
69 
next(&mut self) -> Option<Self::Item>70     fn next(&mut self) -> Option<Self::Item> {
71         // No more slot to process, end of iterator.
72         if self.cur_event >= self.nb_events {
73             return None;
74         }
75 
76         let slot = &mut self.events[self.cur_event];
77         match slot.data() {
78             DEVICE_ID => {
79                 // Figure out which event to return next, if any for this slot.
80                 if slot.events().contains(EpollFlags::EPOLLOUT) {
81                     *slot = EpollEvent::new(
82                         slot.events().difference(EpollFlags::EPOLLOUT),
83                         slot.data(),
84                     );
85                     Some(PollEvent::Device(DeviceEvent::OutputReady))
86                 } else if slot.events().contains(EpollFlags::EPOLLIN) {
87                     *slot =
88                         EpollEvent::new(slot.events().difference(EpollFlags::EPOLLIN), slot.data());
89                     Some(PollEvent::Device(DeviceEvent::CaptureReady))
90                 } else if slot.events().contains(EpollFlags::EPOLLPRI) {
91                     *slot = EpollEvent::new(
92                         slot.events().difference(EpollFlags::EPOLLPRI),
93                         slot.data(),
94                     );
95                     Some(PollEvent::Device(DeviceEvent::V4L2Event))
96                 } else {
97                     // If no more events for this slot, try the next one.
98                     self.cur_event += 1;
99                     self.next()
100                 }
101             }
102             waker_id @ FIRST_WAKER_ID..=LAST_WAKER_ID => {
103                 self.cur_event += 1;
104                 Some(PollEvent::Waker(waker_id as u32))
105             }
106             _ => panic!("Unregistered token returned by epoll_wait!"),
107         }
108     }
109 }
110 
111 pub struct Waker {
112     fd: EventFd,
113 }
114 
115 impl Waker {
new() -> io::Result<Self>116     fn new() -> io::Result<Self> {
117         Ok(Waker {
118             fd: EventFd::from_value_and_flags(0, EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)?,
119         })
120     }
121 
122     /// Users will want to use the `wake()` method on an `Arc<Waker>`.
wake_direct(&self) -> nix::Result<()>123     fn wake_direct(&self) -> nix::Result<()> {
124         // Files support concurrent access at the OS level. The implementation
125         // of Write for &File lets us call the write mutable method even on a
126         // non-mutable File instance.
127         self.fd.write(1).map(|_| ())
128     }
129 
130     /// Perform a read on this waker in order to reset its counter to 0. This
131     /// means it will make subsequent calls to `poll()` block until `wake()` is
132     /// called again.
reset(&self) -> nix::Result<()>133     fn reset(&self) -> nix::Result<()> {
134         match self.fd.read() {
135             // If the counter was already zero, it is already reset so this is
136             // not an error.
137             Ok(_) | Err(Errno::EAGAIN) => Ok(()),
138             Err(e) => Err(e),
139         }
140     }
141 }
142 
143 impl Wake for Waker {
wake(self: Arc<Self>)144     fn wake(self: Arc<Self>) {
145         self.wake_direct().unwrap_or_else(|e| {
146             error!("Failed to signal Waker: {}", e);
147         });
148     }
149 }
150 
151 pub struct Poller {
152     device: Arc<Device>,
153     wakers: BTreeMap<u32, Arc<Waker>>,
154     epoll: Epoll,
155 
156     // Whether or not to listen to specific device events.
157     capture_enabled: bool,
158     output_enabled: bool,
159     events_enabled: bool,
160 
161     // If set, incremented every time we wake up from a poll.
162     poll_wakeups_counter: Option<Arc<AtomicUsize>>,
163 }
164 
165 /// Wakers IDs range.
166 const FIRST_WAKER_ID: u64 = 0;
167 const LAST_WAKER_ID: u64 = DEVICE_ID - 1;
168 /// Give us a comfortable range of 4 billion ids usable for wakers.
169 const DEVICE_ID: u64 = 1 << 32;
170 
171 #[derive(Debug, Error)]
172 pub enum PollError {
173     #[error("timeout value too large for epoll")]
174     TimeoutTryFromError,
175     #[error("error during call to epoll_wait: {0}")]
176     EPollWait(nix::Error),
177     #[error("error while resetting the waker: {0}")]
178     WakerReset(nix::Error),
179     #[error("V4L2 device returned EPOLLERR")]
180     V4L2Device,
181 }
182 
183 impl Poller {
new(device: Arc<Device>) -> nix::Result<Self>184     pub fn new(device: Arc<Device>) -> nix::Result<Self> {
185         let epoll = Epoll::new(EpollCreateFlags::EPOLL_CLOEXEC)?;
186 
187         // Register our device.
188         // There is a bug in some Linux kernels (at least 5.9 and older) where EPOLLIN
189         // and EPOLLOUT events wont be signaled to epoll if the first call to epoll did
190         // not include at least one of EPOLLIN or EPOLLOUT as desired events.
191         // Make sure we don't fall into this trap by registering EPOLLIN first and doing
192         // a dummy poll call. This call will immediately return with an error because the
193         // CAPTURE queue is not streaming, but it will set the right hooks in the kernel
194         // and we can now reconfigure our events to only include EPOLLPRI and have poll
195         // working as expected.
196         epoll.add(&device, EpollEvent::new(EpollFlags::EPOLLIN, DEVICE_ID))?;
197         // This call should return an EPOLLERR event immediately. But it will
198         // also ensure that the CAPTURE and OUTPUT poll handlers are registered
199         // in the kernel for our device.
200         epoll.wait(&mut [EpollEvent::empty()], 10u8)?;
201         // Now reset our device events. We must keep it registered for the
202         // workaround's effect to persist.
203         epoll.modify(
204             &device,
205             &mut EpollEvent::new(EpollFlags::empty(), DEVICE_ID),
206         )?;
207 
208         Ok(Poller {
209             device,
210             wakers: BTreeMap::new(),
211             epoll,
212             capture_enabled: false,
213             output_enabled: false,
214             events_enabled: false,
215             poll_wakeups_counter: None,
216         })
217     }
218 
219     /// Create a `Waker` with identifier `id` and start polling on it. Returns
220     /// the `Waker` if successful, or an error if `id` was already in use or the
221     /// waker could not be polled on.
add_waker(&mut self, id: u32) -> io::Result<Arc<Waker>>222     pub fn add_waker(&mut self, id: u32) -> io::Result<Arc<Waker>> {
223         match self.wakers.entry(id) {
224             std::collections::btree_map::Entry::Vacant(entry) => {
225                 let waker = Waker::new()?;
226 
227                 self.epoll.add(
228                     &waker.fd,
229                     EpollEvent::new(EpollFlags::EPOLLIN, FIRST_WAKER_ID + id as u64),
230                 )?;
231 
232                 let waker = Arc::new(waker);
233                 entry.insert(Arc::clone(&waker));
234                 Ok(waker)
235             }
236             std::collections::btree_map::Entry::Occupied(_) => Err(io::Error::new(
237                 io::ErrorKind::AlreadyExists,
238                 format!("A waker with id {} is already registered", id),
239             )),
240         }
241     }
242 
remove_waker(&mut self, id: u32) -> io::Result<Arc<Waker>>243     pub fn remove_waker(&mut self, id: u32) -> io::Result<Arc<Waker>> {
244         match self.wakers.entry(id) {
245             std::collections::btree_map::Entry::Vacant(_) => Err(io::Error::new(
246                 io::ErrorKind::AlreadyExists,
247                 format!("No waker with id {} in this poller", id),
248             )),
249             std::collections::btree_map::Entry::Occupied(entry) => {
250                 self.epoll.delete(&entry.get().fd)?;
251 
252                 Ok(entry.remove())
253             }
254         }
255     }
256 
set_poll_counter(&mut self, poll_wakeup_counter: Arc<AtomicUsize>)257     pub fn set_poll_counter(&mut self, poll_wakeup_counter: Arc<AtomicUsize>) {
258         self.poll_wakeups_counter = Some(poll_wakeup_counter);
259     }
260 
update_device_registration(&mut self) -> nix::Result<()>261     fn update_device_registration(&mut self) -> nix::Result<()> {
262         let mut epoll_flags = EpollFlags::empty();
263         if self.capture_enabled {
264             epoll_flags.insert(EpollFlags::EPOLLIN);
265         }
266         if self.output_enabled {
267             epoll_flags.insert(EpollFlags::EPOLLOUT);
268         }
269         if self.events_enabled {
270             epoll_flags.insert(EpollFlags::EPOLLPRI);
271         }
272 
273         let mut epoll_event = EpollEvent::new(epoll_flags, DEVICE_ID);
274 
275         self.epoll
276             .modify(&self.device, &mut epoll_event)
277             .map(|_| ())
278     }
279 
set_event(&mut self, event: DeviceEvent, enable: bool) -> nix::Result<()>280     fn set_event(&mut self, event: DeviceEvent, enable: bool) -> nix::Result<()> {
281         let event = match event {
282             DeviceEvent::CaptureReady => &mut self.capture_enabled,
283             DeviceEvent::OutputReady => &mut self.output_enabled,
284             DeviceEvent::V4L2Event => &mut self.events_enabled,
285         };
286 
287         // Do not alter event if it was already in the desired state.
288         if *event == enable {
289             return Ok(());
290         }
291 
292         *event = enable;
293         self.update_device_registration()
294     }
295 
296     /// Enable listening to (and reporting) `event`.
enable_event(&mut self, event: DeviceEvent) -> nix::Result<()>297     pub fn enable_event(&mut self, event: DeviceEvent) -> nix::Result<()> {
298         self.set_event(event, true)
299     }
300 
301     /// Disable listening to (and reporting of) `event`.
disable_event(&mut self, event: DeviceEvent) -> nix::Result<()>302     pub fn disable_event(&mut self, event: DeviceEvent) -> nix::Result<()> {
303         self.set_event(event, false)
304     }
305 
306     /// Returns whether the given event is currently listened to.
is_event_enabled(&self, event: DeviceEvent) -> bool307     pub fn is_event_enabled(&self, event: DeviceEvent) -> bool {
308         match event {
309             DeviceEvent::CaptureReady => self.capture_enabled,
310             DeviceEvent::OutputReady => self.output_enabled,
311             DeviceEvent::V4L2Event => self.events_enabled,
312         }
313     }
314 
poll(&mut self, duration: Option<std::time::Duration>) -> Result<PollEvents, PollError>315     pub fn poll(&mut self, duration: Option<std::time::Duration>) -> Result<PollEvents, PollError> {
316         let mut events = PollEvents::new();
317         let duration: PollTimeout = match duration {
318             None => PollTimeout::NONE,
319             Some(d) => PollTimeout::try_from(d).map_err(|_| PollError::TimeoutTryFromError)?,
320         };
321 
322         events.nb_events = self
323             .epoll
324             .wait(&mut events.events, duration)
325             .map_err(PollError::EPollWait)?;
326 
327         // Update our wake up stats
328         if let Some(wakeup_counter) = &self.poll_wakeups_counter {
329             wakeup_counter.fetch_add(1, Ordering::SeqCst);
330         }
331 
332         // Reset all the wakers that have been signaled.
333         for event in &events.events[0..events.nb_events] {
334             if event.data() <= LAST_WAKER_ID {
335                 match self.wakers.get(&(event.data() as u32)) {
336                     Some(waker) => waker.reset().map_err(PollError::WakerReset)?,
337                     None => warn!("Unregistered waker has been signaled."),
338                 }
339             }
340         }
341 
342         for event in &events.events[0..events.nb_events] {
343             if event.data() == DEVICE_ID && event.events().contains(EpollFlags::EPOLLERR) {
344                 error!("V4L2 device returned EPOLLERR!");
345                 return Err(PollError::V4L2Device);
346             }
347         }
348 
349         Ok(events)
350     }
351 }
352 
353 impl AsFd for Poller {
as_fd(&self) -> BorrowedFd354     fn as_fd(&self) -> BorrowedFd {
355         self.epoll.0.as_fd()
356     }
357 }
358 
359 #[cfg(test)]
360 mod tests {
361     use std::os::fd::AsFd;
362 
363     use super::{DeviceEvent::*, PollEvent, PollEvents, Waker};
364     use super::{DEVICE_ID, FIRST_WAKER_ID};
365     use nix::poll::PollTimeout;
366     use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags};
367 
368     #[test]
test_waker()369     fn test_waker() {
370         let waker = Waker::new().unwrap();
371         let epoll = Epoll::new(EpollCreateFlags::empty()).unwrap();
372         let mut event = [EpollEvent::empty()];
373 
374         epoll
375             .add(
376                 waker.fd.as_fd(),
377                 EpollEvent::new(EpollFlags::EPOLLIN, FIRST_WAKER_ID),
378             )
379             .unwrap();
380 
381         // Waker should initially not be signaled.
382         let nb_events = epoll.wait(&mut event, PollTimeout::ZERO).unwrap();
383         assert_eq!(nb_events, 0);
384 
385         // Waking up should signal.
386         waker.wake_direct().unwrap();
387         let nb_events = epoll.wait(&mut event, PollTimeout::ZERO).unwrap();
388         assert_eq!(nb_events, 1);
389         assert_eq!(
390             event[0],
391             EpollEvent::new(EpollFlags::EPOLLIN, FIRST_WAKER_ID)
392         );
393 
394         // Waking up twice should still signal.
395         waker.wake_direct().unwrap();
396         let nb_events = epoll.wait(&mut event, PollTimeout::ZERO).unwrap();
397         assert_eq!(nb_events, 1);
398         assert_eq!(
399             event[0],
400             EpollEvent::new(EpollFlags::EPOLLIN, FIRST_WAKER_ID)
401         );
402 
403         // Calling reset should stop signaling.
404         waker.reset().unwrap();
405         let nb_events = epoll.wait(&mut event, PollTimeout::ZERO).unwrap();
406         assert_eq!(nb_events, 0);
407 
408         // Calling reset while at rest should be a no-op.
409         waker.reset().unwrap();
410         let nb_events = epoll.wait(&mut event, PollTimeout::ZERO).unwrap();
411         assert_eq!(nb_events, 0);
412     }
413 
414     #[test]
test_pollevents_iterator()415     fn test_pollevents_iterator() {
416         let mut poll_events = PollEvents::new();
417         assert_eq!(poll_events.next(), None);
418 
419         // Single device events
420         let mut poll_events = PollEvents::new();
421         poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLIN, DEVICE_ID);
422         poll_events.nb_events = 1;
423         assert_eq!(poll_events.next(), Some(PollEvent::Device(CaptureReady)));
424         assert_eq!(poll_events.next(), None);
425 
426         let mut poll_events = PollEvents::new();
427         poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLOUT, DEVICE_ID);
428         poll_events.nb_events = 1;
429         assert_eq!(poll_events.next(), Some(PollEvent::Device(OutputReady)));
430         assert_eq!(poll_events.next(), None);
431 
432         let mut poll_events = PollEvents::new();
433         poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLPRI, DEVICE_ID);
434         poll_events.nb_events = 1;
435         assert_eq!(poll_events.next(), Some(PollEvent::Device(V4L2Event)));
436         assert_eq!(poll_events.next(), None);
437 
438         // Multiple device events in one event
439         let mut poll_events = PollEvents::new();
440         poll_events.events[0] =
441             EpollEvent::new(EpollFlags::EPOLLPRI | EpollFlags::EPOLLOUT, DEVICE_ID);
442         poll_events.nb_events = 1;
443         assert_eq!(poll_events.next(), Some(PollEvent::Device(OutputReady)));
444         assert_eq!(poll_events.next(), Some(PollEvent::Device(V4L2Event)));
445         assert_eq!(poll_events.next(), None);
446 
447         // Separated device events
448         let mut poll_events = PollEvents::new();
449         poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLIN, DEVICE_ID);
450         poll_events.events[1] =
451             EpollEvent::new(EpollFlags::EPOLLPRI | EpollFlags::EPOLLOUT, DEVICE_ID);
452         poll_events.nb_events = 2;
453         assert_eq!(poll_events.next(), Some(PollEvent::Device(CaptureReady)));
454         assert_eq!(poll_events.next(), Some(PollEvent::Device(OutputReady)));
455         assert_eq!(poll_events.next(), Some(PollEvent::Device(V4L2Event)));
456         assert_eq!(poll_events.next(), None);
457 
458         // Single waker event
459         let mut poll_events = PollEvents::new();
460         poll_events.events[0] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID);
461         poll_events.nb_events = 1;
462         assert_eq!(poll_events.next(), Some(PollEvent::Waker(0)));
463         assert_eq!(poll_events.next(), None);
464 
465         // Multiple waker events
466         let mut poll_events = PollEvents::new();
467         poll_events.events[0] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 20);
468         poll_events.events[1] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 42);
469         poll_events.events[2] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID);
470         poll_events.nb_events = 3;
471         assert_eq!(poll_events.next(), Some(PollEvent::Waker(20)));
472         assert_eq!(poll_events.next(), Some(PollEvent::Waker(42)));
473         assert_eq!(poll_events.next(), Some(PollEvent::Waker(0)));
474         assert_eq!(poll_events.next(), None);
475 
476         // Wakers and device events
477         let mut poll_events = PollEvents::new();
478         poll_events.events[0] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 20);
479         poll_events.events[1] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 42);
480         poll_events.events[2] =
481             EpollEvent::new(EpollFlags::EPOLLPRI | EpollFlags::EPOLLIN, DEVICE_ID);
482         poll_events.events[3] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID);
483         poll_events.nb_events = 4;
484         assert_eq!(poll_events.next(), Some(PollEvent::Waker(20)));
485         assert_eq!(poll_events.next(), Some(PollEvent::Waker(42)));
486         assert_eq!(poll_events.next(), Some(PollEvent::Device(CaptureReady)));
487         assert_eq!(poll_events.next(), Some(PollEvent::Device(V4L2Event)));
488         assert_eq!(poll_events.next(), Some(PollEvent::Waker(0)));
489         assert_eq!(poll_events.next(), None);
490     }
491 }
492