1 //! A level-triggered `Poller` for V4L2 devices that allows a user to be notified 2 //! when a CAPTURE or OUTPUT buffer is ready to be dequeued, or when a V4L2 3 //! event is ready to be dequeued. 4 //! 5 //! It also provides a `Waker` companion that allows other threads to interrupt 6 //! an ongoing (or coming) poll. Useful to implement an event-based loop. 7 8 use std::{ 9 collections::BTreeMap, 10 convert::TryFrom, 11 io, 12 os::fd::{AsFd, BorrowedFd}, 13 sync::{ 14 atomic::{AtomicUsize, Ordering}, 15 Arc, 16 }, 17 task::Wake, 18 }; 19 20 use log::{error, warn}; 21 use nix::{ 22 errno::Errno, 23 poll::PollTimeout, 24 sys::{ 25 epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags}, 26 eventfd::{EfdFlags, EventFd}, 27 }, 28 }; 29 use thiserror::Error; 30 31 use crate::device::Device; 32 33 #[derive(Debug, PartialEq)] 34 pub enum DeviceEvent { 35 CaptureReady, 36 OutputReady, 37 V4L2Event, 38 } 39 40 #[derive(Debug, PartialEq)] 41 pub enum PollEvent { 42 Device(DeviceEvent), 43 Waker(u32), 44 } 45 46 pub struct PollEvents { 47 events: [EpollEvent; 4], 48 nb_events: usize, 49 cur_event: usize, 50 } 51 52 impl PollEvents { new() -> Self53 fn new() -> Self { 54 PollEvents { 55 events: [ 56 EpollEvent::empty(), 57 EpollEvent::empty(), 58 EpollEvent::empty(), 59 EpollEvent::empty(), 60 ], 61 nb_events: 0, 62 cur_event: 0, 63 } 64 } 65 } 66 67 impl Iterator for PollEvents { 68 type Item = PollEvent; 69 next(&mut self) -> Option<Self::Item>70 fn next(&mut self) -> Option<Self::Item> { 71 // No more slot to process, end of iterator. 72 if self.cur_event >= self.nb_events { 73 return None; 74 } 75 76 let slot = &mut self.events[self.cur_event]; 77 match slot.data() { 78 DEVICE_ID => { 79 // Figure out which event to return next, if any for this slot. 80 if slot.events().contains(EpollFlags::EPOLLOUT) { 81 *slot = EpollEvent::new( 82 slot.events().difference(EpollFlags::EPOLLOUT), 83 slot.data(), 84 ); 85 Some(PollEvent::Device(DeviceEvent::OutputReady)) 86 } else if slot.events().contains(EpollFlags::EPOLLIN) { 87 *slot = 88 EpollEvent::new(slot.events().difference(EpollFlags::EPOLLIN), slot.data()); 89 Some(PollEvent::Device(DeviceEvent::CaptureReady)) 90 } else if slot.events().contains(EpollFlags::EPOLLPRI) { 91 *slot = EpollEvent::new( 92 slot.events().difference(EpollFlags::EPOLLPRI), 93 slot.data(), 94 ); 95 Some(PollEvent::Device(DeviceEvent::V4L2Event)) 96 } else { 97 // If no more events for this slot, try the next one. 98 self.cur_event += 1; 99 self.next() 100 } 101 } 102 waker_id @ FIRST_WAKER_ID..=LAST_WAKER_ID => { 103 self.cur_event += 1; 104 Some(PollEvent::Waker(waker_id as u32)) 105 } 106 _ => panic!("Unregistered token returned by epoll_wait!"), 107 } 108 } 109 } 110 111 pub struct Waker { 112 fd: EventFd, 113 } 114 115 impl Waker { new() -> io::Result<Self>116 fn new() -> io::Result<Self> { 117 Ok(Waker { 118 fd: EventFd::from_value_and_flags(0, EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)?, 119 }) 120 } 121 122 /// Users will want to use the `wake()` method on an `Arc<Waker>`. wake_direct(&self) -> nix::Result<()>123 fn wake_direct(&self) -> nix::Result<()> { 124 // Files support concurrent access at the OS level. The implementation 125 // of Write for &File lets us call the write mutable method even on a 126 // non-mutable File instance. 127 self.fd.write(1).map(|_| ()) 128 } 129 130 /// Perform a read on this waker in order to reset its counter to 0. This 131 /// means it will make subsequent calls to `poll()` block until `wake()` is 132 /// called again. reset(&self) -> nix::Result<()>133 fn reset(&self) -> nix::Result<()> { 134 match self.fd.read() { 135 // If the counter was already zero, it is already reset so this is 136 // not an error. 137 Ok(_) | Err(Errno::EAGAIN) => Ok(()), 138 Err(e) => Err(e), 139 } 140 } 141 } 142 143 impl Wake for Waker { wake(self: Arc<Self>)144 fn wake(self: Arc<Self>) { 145 self.wake_direct().unwrap_or_else(|e| { 146 error!("Failed to signal Waker: {}", e); 147 }); 148 } 149 } 150 151 pub struct Poller { 152 device: Arc<Device>, 153 wakers: BTreeMap<u32, Arc<Waker>>, 154 epoll: Epoll, 155 156 // Whether or not to listen to specific device events. 157 capture_enabled: bool, 158 output_enabled: bool, 159 events_enabled: bool, 160 161 // If set, incremented every time we wake up from a poll. 162 poll_wakeups_counter: Option<Arc<AtomicUsize>>, 163 } 164 165 /// Wakers IDs range. 166 const FIRST_WAKER_ID: u64 = 0; 167 const LAST_WAKER_ID: u64 = DEVICE_ID - 1; 168 /// Give us a comfortable range of 4 billion ids usable for wakers. 169 const DEVICE_ID: u64 = 1 << 32; 170 171 #[derive(Debug, Error)] 172 pub enum PollError { 173 #[error("timeout value too large for epoll")] 174 TimeoutTryFromError, 175 #[error("error during call to epoll_wait: {0}")] 176 EPollWait(nix::Error), 177 #[error("error while resetting the waker: {0}")] 178 WakerReset(nix::Error), 179 #[error("V4L2 device returned EPOLLERR")] 180 V4L2Device, 181 } 182 183 impl Poller { new(device: Arc<Device>) -> nix::Result<Self>184 pub fn new(device: Arc<Device>) -> nix::Result<Self> { 185 let epoll = Epoll::new(EpollCreateFlags::EPOLL_CLOEXEC)?; 186 187 // Register our device. 188 // There is a bug in some Linux kernels (at least 5.9 and older) where EPOLLIN 189 // and EPOLLOUT events wont be signaled to epoll if the first call to epoll did 190 // not include at least one of EPOLLIN or EPOLLOUT as desired events. 191 // Make sure we don't fall into this trap by registering EPOLLIN first and doing 192 // a dummy poll call. This call will immediately return with an error because the 193 // CAPTURE queue is not streaming, but it will set the right hooks in the kernel 194 // and we can now reconfigure our events to only include EPOLLPRI and have poll 195 // working as expected. 196 epoll.add(&device, EpollEvent::new(EpollFlags::EPOLLIN, DEVICE_ID))?; 197 // This call should return an EPOLLERR event immediately. But it will 198 // also ensure that the CAPTURE and OUTPUT poll handlers are registered 199 // in the kernel for our device. 200 epoll.wait(&mut [EpollEvent::empty()], 10u8)?; 201 // Now reset our device events. We must keep it registered for the 202 // workaround's effect to persist. 203 epoll.modify( 204 &device, 205 &mut EpollEvent::new(EpollFlags::empty(), DEVICE_ID), 206 )?; 207 208 Ok(Poller { 209 device, 210 wakers: BTreeMap::new(), 211 epoll, 212 capture_enabled: false, 213 output_enabled: false, 214 events_enabled: false, 215 poll_wakeups_counter: None, 216 }) 217 } 218 219 /// Create a `Waker` with identifier `id` and start polling on it. Returns 220 /// the `Waker` if successful, or an error if `id` was already in use or the 221 /// waker could not be polled on. add_waker(&mut self, id: u32) -> io::Result<Arc<Waker>>222 pub fn add_waker(&mut self, id: u32) -> io::Result<Arc<Waker>> { 223 match self.wakers.entry(id) { 224 std::collections::btree_map::Entry::Vacant(entry) => { 225 let waker = Waker::new()?; 226 227 self.epoll.add( 228 &waker.fd, 229 EpollEvent::new(EpollFlags::EPOLLIN, FIRST_WAKER_ID + id as u64), 230 )?; 231 232 let waker = Arc::new(waker); 233 entry.insert(Arc::clone(&waker)); 234 Ok(waker) 235 } 236 std::collections::btree_map::Entry::Occupied(_) => Err(io::Error::new( 237 io::ErrorKind::AlreadyExists, 238 format!("A waker with id {} is already registered", id), 239 )), 240 } 241 } 242 remove_waker(&mut self, id: u32) -> io::Result<Arc<Waker>>243 pub fn remove_waker(&mut self, id: u32) -> io::Result<Arc<Waker>> { 244 match self.wakers.entry(id) { 245 std::collections::btree_map::Entry::Vacant(_) => Err(io::Error::new( 246 io::ErrorKind::AlreadyExists, 247 format!("No waker with id {} in this poller", id), 248 )), 249 std::collections::btree_map::Entry::Occupied(entry) => { 250 self.epoll.delete(&entry.get().fd)?; 251 252 Ok(entry.remove()) 253 } 254 } 255 } 256 set_poll_counter(&mut self, poll_wakeup_counter: Arc<AtomicUsize>)257 pub fn set_poll_counter(&mut self, poll_wakeup_counter: Arc<AtomicUsize>) { 258 self.poll_wakeups_counter = Some(poll_wakeup_counter); 259 } 260 update_device_registration(&mut self) -> nix::Result<()>261 fn update_device_registration(&mut self) -> nix::Result<()> { 262 let mut epoll_flags = EpollFlags::empty(); 263 if self.capture_enabled { 264 epoll_flags.insert(EpollFlags::EPOLLIN); 265 } 266 if self.output_enabled { 267 epoll_flags.insert(EpollFlags::EPOLLOUT); 268 } 269 if self.events_enabled { 270 epoll_flags.insert(EpollFlags::EPOLLPRI); 271 } 272 273 let mut epoll_event = EpollEvent::new(epoll_flags, DEVICE_ID); 274 275 self.epoll 276 .modify(&self.device, &mut epoll_event) 277 .map(|_| ()) 278 } 279 set_event(&mut self, event: DeviceEvent, enable: bool) -> nix::Result<()>280 fn set_event(&mut self, event: DeviceEvent, enable: bool) -> nix::Result<()> { 281 let event = match event { 282 DeviceEvent::CaptureReady => &mut self.capture_enabled, 283 DeviceEvent::OutputReady => &mut self.output_enabled, 284 DeviceEvent::V4L2Event => &mut self.events_enabled, 285 }; 286 287 // Do not alter event if it was already in the desired state. 288 if *event == enable { 289 return Ok(()); 290 } 291 292 *event = enable; 293 self.update_device_registration() 294 } 295 296 /// Enable listening to (and reporting) `event`. enable_event(&mut self, event: DeviceEvent) -> nix::Result<()>297 pub fn enable_event(&mut self, event: DeviceEvent) -> nix::Result<()> { 298 self.set_event(event, true) 299 } 300 301 /// Disable listening to (and reporting of) `event`. disable_event(&mut self, event: DeviceEvent) -> nix::Result<()>302 pub fn disable_event(&mut self, event: DeviceEvent) -> nix::Result<()> { 303 self.set_event(event, false) 304 } 305 306 /// Returns whether the given event is currently listened to. is_event_enabled(&self, event: DeviceEvent) -> bool307 pub fn is_event_enabled(&self, event: DeviceEvent) -> bool { 308 match event { 309 DeviceEvent::CaptureReady => self.capture_enabled, 310 DeviceEvent::OutputReady => self.output_enabled, 311 DeviceEvent::V4L2Event => self.events_enabled, 312 } 313 } 314 poll(&mut self, duration: Option<std::time::Duration>) -> Result<PollEvents, PollError>315 pub fn poll(&mut self, duration: Option<std::time::Duration>) -> Result<PollEvents, PollError> { 316 let mut events = PollEvents::new(); 317 let duration: PollTimeout = match duration { 318 None => PollTimeout::NONE, 319 Some(d) => PollTimeout::try_from(d).map_err(|_| PollError::TimeoutTryFromError)?, 320 }; 321 322 events.nb_events = self 323 .epoll 324 .wait(&mut events.events, duration) 325 .map_err(PollError::EPollWait)?; 326 327 // Update our wake up stats 328 if let Some(wakeup_counter) = &self.poll_wakeups_counter { 329 wakeup_counter.fetch_add(1, Ordering::SeqCst); 330 } 331 332 // Reset all the wakers that have been signaled. 333 for event in &events.events[0..events.nb_events] { 334 if event.data() <= LAST_WAKER_ID { 335 match self.wakers.get(&(event.data() as u32)) { 336 Some(waker) => waker.reset().map_err(PollError::WakerReset)?, 337 None => warn!("Unregistered waker has been signaled."), 338 } 339 } 340 } 341 342 for event in &events.events[0..events.nb_events] { 343 if event.data() == DEVICE_ID && event.events().contains(EpollFlags::EPOLLERR) { 344 error!("V4L2 device returned EPOLLERR!"); 345 return Err(PollError::V4L2Device); 346 } 347 } 348 349 Ok(events) 350 } 351 } 352 353 impl AsFd for Poller { as_fd(&self) -> BorrowedFd354 fn as_fd(&self) -> BorrowedFd { 355 self.epoll.0.as_fd() 356 } 357 } 358 359 #[cfg(test)] 360 mod tests { 361 use std::os::fd::AsFd; 362 363 use super::{DeviceEvent::*, PollEvent, PollEvents, Waker}; 364 use super::{DEVICE_ID, FIRST_WAKER_ID}; 365 use nix::poll::PollTimeout; 366 use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags}; 367 368 #[test] test_waker()369 fn test_waker() { 370 let waker = Waker::new().unwrap(); 371 let epoll = Epoll::new(EpollCreateFlags::empty()).unwrap(); 372 let mut event = [EpollEvent::empty()]; 373 374 epoll 375 .add( 376 waker.fd.as_fd(), 377 EpollEvent::new(EpollFlags::EPOLLIN, FIRST_WAKER_ID), 378 ) 379 .unwrap(); 380 381 // Waker should initially not be signaled. 382 let nb_events = epoll.wait(&mut event, PollTimeout::ZERO).unwrap(); 383 assert_eq!(nb_events, 0); 384 385 // Waking up should signal. 386 waker.wake_direct().unwrap(); 387 let nb_events = epoll.wait(&mut event, PollTimeout::ZERO).unwrap(); 388 assert_eq!(nb_events, 1); 389 assert_eq!( 390 event[0], 391 EpollEvent::new(EpollFlags::EPOLLIN, FIRST_WAKER_ID) 392 ); 393 394 // Waking up twice should still signal. 395 waker.wake_direct().unwrap(); 396 let nb_events = epoll.wait(&mut event, PollTimeout::ZERO).unwrap(); 397 assert_eq!(nb_events, 1); 398 assert_eq!( 399 event[0], 400 EpollEvent::new(EpollFlags::EPOLLIN, FIRST_WAKER_ID) 401 ); 402 403 // Calling reset should stop signaling. 404 waker.reset().unwrap(); 405 let nb_events = epoll.wait(&mut event, PollTimeout::ZERO).unwrap(); 406 assert_eq!(nb_events, 0); 407 408 // Calling reset while at rest should be a no-op. 409 waker.reset().unwrap(); 410 let nb_events = epoll.wait(&mut event, PollTimeout::ZERO).unwrap(); 411 assert_eq!(nb_events, 0); 412 } 413 414 #[test] test_pollevents_iterator()415 fn test_pollevents_iterator() { 416 let mut poll_events = PollEvents::new(); 417 assert_eq!(poll_events.next(), None); 418 419 // Single device events 420 let mut poll_events = PollEvents::new(); 421 poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLIN, DEVICE_ID); 422 poll_events.nb_events = 1; 423 assert_eq!(poll_events.next(), Some(PollEvent::Device(CaptureReady))); 424 assert_eq!(poll_events.next(), None); 425 426 let mut poll_events = PollEvents::new(); 427 poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLOUT, DEVICE_ID); 428 poll_events.nb_events = 1; 429 assert_eq!(poll_events.next(), Some(PollEvent::Device(OutputReady))); 430 assert_eq!(poll_events.next(), None); 431 432 let mut poll_events = PollEvents::new(); 433 poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLPRI, DEVICE_ID); 434 poll_events.nb_events = 1; 435 assert_eq!(poll_events.next(), Some(PollEvent::Device(V4L2Event))); 436 assert_eq!(poll_events.next(), None); 437 438 // Multiple device events in one event 439 let mut poll_events = PollEvents::new(); 440 poll_events.events[0] = 441 EpollEvent::new(EpollFlags::EPOLLPRI | EpollFlags::EPOLLOUT, DEVICE_ID); 442 poll_events.nb_events = 1; 443 assert_eq!(poll_events.next(), Some(PollEvent::Device(OutputReady))); 444 assert_eq!(poll_events.next(), Some(PollEvent::Device(V4L2Event))); 445 assert_eq!(poll_events.next(), None); 446 447 // Separated device events 448 let mut poll_events = PollEvents::new(); 449 poll_events.events[0] = EpollEvent::new(EpollFlags::EPOLLIN, DEVICE_ID); 450 poll_events.events[1] = 451 EpollEvent::new(EpollFlags::EPOLLPRI | EpollFlags::EPOLLOUT, DEVICE_ID); 452 poll_events.nb_events = 2; 453 assert_eq!(poll_events.next(), Some(PollEvent::Device(CaptureReady))); 454 assert_eq!(poll_events.next(), Some(PollEvent::Device(OutputReady))); 455 assert_eq!(poll_events.next(), Some(PollEvent::Device(V4L2Event))); 456 assert_eq!(poll_events.next(), None); 457 458 // Single waker event 459 let mut poll_events = PollEvents::new(); 460 poll_events.events[0] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID); 461 poll_events.nb_events = 1; 462 assert_eq!(poll_events.next(), Some(PollEvent::Waker(0))); 463 assert_eq!(poll_events.next(), None); 464 465 // Multiple waker events 466 let mut poll_events = PollEvents::new(); 467 poll_events.events[0] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 20); 468 poll_events.events[1] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 42); 469 poll_events.events[2] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID); 470 poll_events.nb_events = 3; 471 assert_eq!(poll_events.next(), Some(PollEvent::Waker(20))); 472 assert_eq!(poll_events.next(), Some(PollEvent::Waker(42))); 473 assert_eq!(poll_events.next(), Some(PollEvent::Waker(0))); 474 assert_eq!(poll_events.next(), None); 475 476 // Wakers and device events 477 let mut poll_events = PollEvents::new(); 478 poll_events.events[0] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 20); 479 poll_events.events[1] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID + 42); 480 poll_events.events[2] = 481 EpollEvent::new(EpollFlags::EPOLLPRI | EpollFlags::EPOLLIN, DEVICE_ID); 482 poll_events.events[3] = EpollEvent::new(EpollFlags::empty(), FIRST_WAKER_ID); 483 poll_events.nb_events = 4; 484 assert_eq!(poll_events.next(), Some(PollEvent::Waker(20))); 485 assert_eq!(poll_events.next(), Some(PollEvent::Waker(42))); 486 assert_eq!(poll_events.next(), Some(PollEvent::Device(CaptureReady))); 487 assert_eq!(poll_events.next(), Some(PollEvent::Device(V4L2Event))); 488 assert_eq!(poll_events.next(), Some(PollEvent::Waker(0))); 489 assert_eq!(poll_events.next(), None); 490 } 491 } 492