1 use crate::{
2 decoder::{
3 stateful::{CaptureThreadResponse, DecoderCommand, DecoderEvent, DrainError},
4 DecoderEventCallback, FormatChangedCallback, FormatChangedReply,
5 },
6 device::{
7 poller::{DeviceEvent, PollEvent, Poller, Waker},
8 queue::{
9 self,
10 direction::Capture,
11 handles_provider::HandlesProvider,
12 qbuf::{
13 get_free::GetFreeCaptureBuffer, get_indexed::GetCaptureBufferByIndex,
14 CaptureQueueable,
15 },
16 BuffersAllocated, Queue, QueueInit,
17 },
18 AllocatedQueue, Device, Stream, TryDequeue,
19 },
20 ioctl::{self, SelectionTarget},
21 };
22
23 use std::{
24 io,
25 sync::{mpsc, Arc},
26 task::Wake,
27 };
28
29 use log::{debug, error, trace, warn};
30 use thiserror::Error;
31
32 /// Check if `device` has a dynamic resolution change event pending.
33 ///
34 /// Dequeues all pending V4L2 events and returns `true` if a
35 /// SRC_CHANGE_EVENT (indicating a format change on the CAPTURE queue) was
36 /// detected. This consumes all the event, meaning that if this method
37 /// returned `true` once it will return `false` until a new resolution
38 /// change happens in the stream.
is_drc_event_pending(device: &Device) -> Result<bool, ioctl::DqEventError>39 fn is_drc_event_pending(device: &Device) -> Result<bool, ioctl::DqEventError> {
40 let mut drc_pending = false;
41
42 loop {
43 // TODO what if we used an iterator here?
44 let event = match ioctl::dqevent(device) {
45 Ok(event) => event,
46 Err(ioctl::DqEventError::NotReady) => return Ok(drc_pending),
47 Err(e) => return Err(e),
48 };
49
50 match event {
51 ioctl::Event::SrcChangeEvent(changes) => {
52 if changes.contains(ioctl::SrcChanges::RESOLUTION) {
53 debug!("Received resolution change event");
54 drc_pending = true;
55 }
56 }
57 ioctl::Event::Eos => {
58 debug!("Received EOS event");
59 }
60 }
61 }
62 }
63
64 enum CaptureQueue<P: HandlesProvider> {
65 AwaitingResolution {
66 capture_queue: Queue<Capture, QueueInit>,
67 },
68 Decoding {
69 capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>,
70 provider: P,
71 cap_buffer_waker: Arc<Waker>,
72 // TODO not super elegant...
73 blocking_drain_in_progress: bool,
74 },
75 }
76
77 pub(super) struct CaptureThread<P, DecoderEventCb, FormatChangedCb>
78 where
79 P: HandlesProvider,
80 DecoderEventCb: DecoderEventCallback<P>,
81 FormatChangedCb: FormatChangedCallback<P>,
82 {
83 device: Arc<Device>,
84 capture_queue: CaptureQueue<P>,
85 pub(super) poller: Poller,
86
87 event_cb: DecoderEventCb,
88 set_capture_format_cb: FormatChangedCb,
89
90 // Waker signaled when the main thread has commands pending for us.
91 pub(super) command_waker: Arc<Waker>,
92 // Receiver we read commands from when `command_waker` is signaled.
93 command_receiver: mpsc::Receiver<DecoderCommand>,
94 // Sender we use to send status messages after receiving commands from the
95 // main thread.
96 response_sender: mpsc::Sender<CaptureThreadResponse>,
97 }
98
99 #[derive(Debug, Error)]
100 enum UpdateCaptureError {
101 #[error("error while enabling poller events: {0}")]
102 PollerEvents(io::Error),
103 #[error("error while removing CAPTURE waker: {0}")]
104 RemoveWaker(io::Error),
105 #[error("error while stopping CAPTURE queue: {0}")]
106 Streamoff(#[from] ioctl::StreamOffError),
107 #[error("error while freeing CAPTURE buffers: {0}")]
108 FreeBuffers(#[from] ioctl::ReqbufsError),
109 #[error("error while obtaining CAPTURE format: {0}")]
110 GFmt(#[from] ioctl::GFmtError),
111 #[error("error while obtaining selection target from CAPTURE queue: {0}")]
112 GSelection(#[from] ioctl::GSelectionError),
113 #[error("error while running the CAPTURE format callback: {0}")]
114 Callback(#[from] anyhow::Error),
115 #[error("error while requesting CAPTURE buffers: {0}")]
116 RequestBuffers(#[from] queue::RequestBuffersError),
117 #[error("error while adding the CAPTURE buffer waker: {0}")]
118 AddWaker(io::Error),
119 #[error("error while streaming CAPTURE queue: {0}")]
120 StreamOn(#[from] ioctl::StreamOnError),
121 }
122
123 const CAPTURE_READY: u32 = 1;
124 const COMMAND_WAITING: u32 = 2;
125
126 #[derive(Debug, Error)]
127 enum ProcessEventsError {
128 #[error("error while dequeueing event")]
129 DqEvent(#[from] ioctl::DqEventError),
130 #[error("error while requesting buffers")]
131 RequestBuffers(#[from] queue::RequestBuffersError),
132 #[error("error while updating CAPTURE format")]
133 UpdateCapture(#[from] UpdateCaptureError),
134 }
135
136 impl<P, DecoderEventCb, FormatChangedCb> CaptureThread<P, DecoderEventCb, FormatChangedCb>
137 where
138 P: HandlesProvider,
139 DecoderEventCb: DecoderEventCallback<P>,
140 FormatChangedCb: FormatChangedCallback<P>,
141 for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
142 GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>,
143 {
new( device: &Arc<Device>, capture_queue: Queue<Capture, QueueInit>, event_cb: DecoderEventCb, set_capture_format_cb: FormatChangedCb, command_receiver: mpsc::Receiver<DecoderCommand>, response_sender: mpsc::Sender<CaptureThreadResponse>, ) -> io::Result<Self>144 pub(super) fn new(
145 device: &Arc<Device>,
146 capture_queue: Queue<Capture, QueueInit>,
147 event_cb: DecoderEventCb,
148 set_capture_format_cb: FormatChangedCb,
149 command_receiver: mpsc::Receiver<DecoderCommand>,
150 response_sender: mpsc::Sender<CaptureThreadResponse>,
151 ) -> io::Result<Self> {
152 // Start by only listening to V4L2 events in order to catch the initial
153 // resolution change, and to the stop waker in case the user had a
154 // change of heart about decoding something now.
155 let mut poller = Poller::new(Arc::clone(device))?;
156 poller.enable_event(DeviceEvent::V4L2Event)?;
157 let command_waker = poller.add_waker(COMMAND_WAITING)?;
158
159 let decoder_thread = CaptureThread {
160 device: Arc::clone(device),
161 capture_queue: CaptureQueue::AwaitingResolution { capture_queue },
162 poller,
163 event_cb,
164 set_capture_format_cb,
165 command_waker,
166 command_receiver,
167 response_sender,
168 };
169
170 Ok(decoder_thread)
171 }
172
send_response(&self, response: CaptureThreadResponse)173 fn send_response(&self, response: CaptureThreadResponse) {
174 trace!("Sending response: {:?}", response);
175
176 self.response_sender.send(response).unwrap();
177 }
178
drain(&mut self, blocking: bool)179 fn drain(&mut self, blocking: bool) {
180 trace!("Processing Drain({}) command", blocking);
181 let response = match &mut self.capture_queue {
182 // We cannot initiate the flush sequence before receiving the initial
183 // resolution.
184 CaptureQueue::AwaitingResolution { .. } => {
185 Some(CaptureThreadResponse::DrainDone(Err(DrainError::TryAgain)))
186 }
187 CaptureQueue::Decoding {
188 blocking_drain_in_progress,
189 ..
190 } => {
191 // We can receive the LAST buffer, send the STOP command
192 // and exit the loop once the buffer with the LAST tag is received.
193 ioctl::decoder_cmd::<_, ()>(&*self.device, ioctl::DecoderCmd::stop()).unwrap();
194 if blocking {
195 // If we are blocking, we will send the answer when the drain
196 // is completed.
197 *blocking_drain_in_progress = true;
198 None
199 } else {
200 // If not blocking, send the response now so the client can keep going.
201 Some(CaptureThreadResponse::DrainDone(Ok(false)))
202 }
203 }
204 };
205
206 if let Some(response) = response {
207 self.send_response(response);
208 }
209 }
210
flush(&mut self)211 fn flush(&mut self) {
212 trace!("Processing flush command");
213 match &mut self.capture_queue {
214 CaptureQueue::AwaitingResolution { .. } => {}
215 CaptureQueue::Decoding {
216 capture_queue,
217 blocking_drain_in_progress,
218 ..
219 } => {
220 // Stream the capture queue off and back on, dropping any queued
221 // buffer, and making the decoder ready to work again if it was
222 // halted.
223 capture_queue.stream_off().unwrap();
224 capture_queue.stream_on().unwrap();
225 *blocking_drain_in_progress = false;
226 }
227 }
228
229 self.send_response(CaptureThreadResponse::FlushDone(Ok(())));
230 self.enqueue_capture_buffers()
231 }
232
enqueue_capture_buffers(&mut self)233 fn enqueue_capture_buffers(&mut self) {
234 trace!("Queueing available CAPTURE buffers");
235 let (capture_queue, provider, cap_buffer_waker) = match &mut self.capture_queue {
236 // Capture queue is not set up yet, no buffers to queue.
237 CaptureQueue::AwaitingResolution { .. } => return,
238 CaptureQueue::Decoding {
239 capture_queue,
240 provider,
241 cap_buffer_waker,
242 ..
243 } => (capture_queue, provider, cap_buffer_waker),
244 };
245
246 // Requeue all available CAPTURE buffers.
247 'enqueue: while let Some(handles) = provider.get_handles(cap_buffer_waker) {
248 // TODO potential problem: the handles will be dropped if no V4L2 buffer
249 // is available. There is no guarantee that the provider will get them back
250 // in this case (e.g. with the C FFI).
251 let buffer = match provider.get_suitable_buffer_for(&handles, capture_queue) {
252 Ok(buffer) => buffer,
253 // It is possible that we run out of V4L2 buffers if there are more handles than
254 // buffers allocated. One example of this scenario is the `MmapProvider` which has
255 // an infinite number of handles. Break out of the loop when this happens - we will
256 // be called again the next time a CAPTURE buffer becomes available.
257 Err(queue::handles_provider::GetSuitableBufferError::TryGetFree(
258 queue::qbuf::get_free::GetFreeBufferError::NoFreeBuffer,
259 )) => {
260 break 'enqueue;
261 }
262 Err(e) => {
263 error!("Could not find suitable buffer for handles: {}", e);
264 warn!("Handles potentially lost due to no V4L2 buffer being available");
265 break 'enqueue;
266 }
267 };
268 match buffer.queue_with_handles(handles) {
269 Ok(()) => (),
270 Err(e) => error!("Error while queueing CAPTURE buffer: {}", e),
271 }
272 }
273 }
274
process_v4l2_event(mut self) -> Self275 fn process_v4l2_event(mut self) -> Self {
276 trace!("Processing V4L2 event");
277 match self.capture_queue {
278 CaptureQueue::AwaitingResolution { .. } => {
279 if is_drc_event_pending(&self.device).unwrap() {
280 self = self.update_capture_format().unwrap()
281 }
282 }
283 CaptureQueue::Decoding { .. } => unreachable!(),
284 }
285
286 self
287 }
288
update_capture_format(mut self) -> Result<Self, UpdateCaptureError>289 fn update_capture_format(mut self) -> Result<Self, UpdateCaptureError> {
290 debug!("Updating CAPTURE format");
291 // First reset the capture queue to the `Init` state if needed.
292 let mut capture_queue = match self.capture_queue {
293 // Initial resolution
294 CaptureQueue::AwaitingResolution { capture_queue } => {
295 // Stop listening to V4L2 events. We will check them when we get
296 // a buffer with the LAST flag.
297 self.poller
298 .disable_event(DeviceEvent::V4L2Event)
299 .map_err(Into::<io::Error>::into)
300 .map_err(UpdateCaptureError::PollerEvents)?;
301 // Listen to CAPTURE buffers being ready to dequeue, as we will
302 // be streaming soon.
303 self.poller
304 .enable_event(DeviceEvent::CaptureReady)
305 .map_err(Into::<io::Error>::into)
306 .map_err(UpdateCaptureError::PollerEvents)?;
307 capture_queue
308 }
309 // Dynamic resolution change
310 CaptureQueue::Decoding { capture_queue, .. } => {
311 // Remove the waker for the previous buffers pool, as we will
312 // get a new set of buffers.
313 self.poller
314 .remove_waker(CAPTURE_READY)
315 .map_err(UpdateCaptureError::RemoveWaker)?;
316 // Deallocate the queue and return it to the `Init` state. Good
317 // as new!
318 capture_queue.stream_off()?;
319 capture_queue.free_buffers()?.queue
320 }
321 };
322
323 // Now get the parameters of the new format and build our new CAPTURE
324 // queue.
325
326 // TODO use the proper control to get the right value.
327 let min_num_buffers = 4usize;
328 debug!("Stream requires {} capture buffers", min_num_buffers);
329
330 let visible_rect = capture_queue.get_selection(SelectionTarget::Compose)?;
331 debug!(
332 "Visible rectangle: ({}, {}), {}x{}",
333 visible_rect.left, visible_rect.top, visible_rect.width, visible_rect.height
334 );
335
336 // Let the client adjust the new format and give us the handles provider.
337 let FormatChangedReply {
338 provider,
339 mem_type,
340 num_buffers,
341 } = (self.set_capture_format_cb)(
342 capture_queue.change_format()?,
343 visible_rect,
344 min_num_buffers,
345 )?;
346
347 debug!("Client requires {} capture buffers", num_buffers);
348
349 // Allocate the new CAPTURE buffers and get ourselves a new waker for
350 // returning buffers.
351 let capture_queue =
352 capture_queue.request_buffers_generic::<P::HandleType>(mem_type, num_buffers as u32)?;
353 let cap_buffer_waker = self
354 .poller
355 .add_waker(CAPTURE_READY)
356 .map_err(UpdateCaptureError::AddWaker)?;
357
358 // Ready to decode - signal the waker so we immediately enqueue buffers
359 // and start streaming.
360 cap_buffer_waker.wake_by_ref();
361 capture_queue.stream_on()?;
362
363 Ok(Self {
364 capture_queue: CaptureQueue::Decoding {
365 capture_queue,
366 provider,
367 cap_buffer_waker,
368 blocking_drain_in_progress: false,
369 },
370 ..self
371 })
372 }
373
374 /// Attempt to dequeue and process a single CAPTURE buffer.
375 ///
376 /// If a buffer can be dequeued, then the following processing takes place:
377 /// * Invoke the event callback with a `FrameDecoded` event containing the
378 /// dequeued buffer,
379 /// * If the buffer has the LAST flag set:
380 /// * If a resolution change event is pending, start the resolution change
381 /// procedure,
382 /// * If a resolution change event is not pending, invoke the event
383 /// callback with an 'EndOfStream` event,
384 /// * If a blocking drain was in progress, complete it.
dequeue_capture_buffer(mut self) -> Self385 fn dequeue_capture_buffer(mut self) -> Self {
386 trace!("Dequeueing decoded CAPTURE buffers");
387 let (capture_queue, cap_buffer_waker, blocking_drain_in_progress) =
388 match &mut self.capture_queue {
389 CaptureQueue::AwaitingResolution { .. } => unreachable!(),
390 CaptureQueue::Decoding {
391 capture_queue,
392 cap_buffer_waker,
393 blocking_drain_in_progress,
394 ..
395 } => (capture_queue, cap_buffer_waker, blocking_drain_in_progress),
396 };
397
398 let mut cap_buf = match capture_queue.try_dequeue() {
399 Ok(cap_buf) => cap_buf,
400 Err(e) => {
401 warn!(
402 "Expected a CAPTURE buffer but none available, possible driver bug: {}",
403 e
404 );
405 return self;
406 }
407 };
408
409 let is_last = cap_buf.data.is_last();
410
411 // Add a drop callback to the dequeued buffer so we
412 // re-queue it as soon as it is dropped.
413 let cap_waker = Arc::clone(cap_buffer_waker);
414 cap_buf.add_drop_callback(move |_dqbuf| {
415 // Intentionally ignore the result here.
416 cap_waker.wake();
417 });
418
419 // Pass buffers to the client
420 (self.event_cb)(DecoderEvent::FrameDecoded(cap_buf));
421
422 if is_last {
423 debug!("CAPTURE buffer marked with LAST flag");
424 if is_drc_event_pending(&self.device).unwrap() {
425 debug!("DRC event pending, updating CAPTURE format");
426 self = self.update_capture_format().unwrap()
427 }
428 // No DRC event pending, this is the end of the stream.
429 // We need to stop and restart the CAPTURE queue, otherwise
430 // it will keep signaling buffers as ready and dequeueing
431 // them will return `EPIPE`.
432 else {
433 debug!("No DRC event pending, restarting capture queue");
434 // We are supposed to be able to run the START command
435 // instead, but with vicodec the CAPTURE queue reports
436 // as ready in subsequent polls() and DQBUF returns
437 // -EPIPE...
438 capture_queue.stream_off().unwrap();
439 capture_queue.stream_on().unwrap();
440 (self.event_cb)(DecoderEvent::EndOfStream);
441 if *blocking_drain_in_progress {
442 debug!("Signaling end of blocking drain");
443 *blocking_drain_in_progress = false;
444 self.send_response(CaptureThreadResponse::DrainDone(Ok(true)));
445 }
446 }
447 }
448
449 self
450 }
451
run(mut self) -> Self452 pub(super) fn run(mut self) -> Self {
453 'mainloop: loop {
454 if let CaptureQueue::Decoding { capture_queue, .. } = &self.capture_queue {
455 match capture_queue.num_queued_buffers() {
456 // If there are no buffers on the CAPTURE queue, poll() will return
457 // immediately with EPOLLERR and we would loop indefinitely.
458 // Prevent this by temporarily disabling polling the CAPTURE queue
459 // in such cases.
460 0 => {
461 self.poller
462 .disable_event(DeviceEvent::CaptureReady)
463 .unwrap();
464 }
465 // If device polling was disabled and we have buffers queued, we
466 // can reenable it as poll will now wait for a CAPTURE buffer to
467 // be ready for dequeue.
468 _ => {
469 self.poller.enable_event(DeviceEvent::CaptureReady).unwrap();
470 }
471 }
472 }
473
474 trace!("Polling...");
475 let events = match self.poller.poll(None) {
476 Ok(events) => events,
477 Err(e) => {
478 error!("Polling failure, exiting capture thread: {}", e);
479 break 'mainloop;
480 }
481 };
482 for event in events {
483 self = match event {
484 PollEvent::Device(DeviceEvent::V4L2Event) => self.process_v4l2_event(),
485 PollEvent::Device(DeviceEvent::CaptureReady) => self.dequeue_capture_buffer(),
486 PollEvent::Waker(CAPTURE_READY) => {
487 self.enqueue_capture_buffers();
488 self
489 }
490 PollEvent::Waker(COMMAND_WAITING) => {
491 loop {
492 let command =
493 match self.command_receiver.recv_timeout(Default::default()) {
494 Ok(command) => command,
495 Err(mpsc::RecvTimeoutError::Timeout) => break,
496 Err(e) => {
497 error!("Error while reading decoder command: {}", e);
498 break;
499 }
500 };
501 match command {
502 DecoderCommand::Drain(blocking) => self.drain(blocking),
503 DecoderCommand::Flush => self.flush(),
504 DecoderCommand::Stop => {
505 trace!("Processing stop command");
506 break 'mainloop;
507 }
508 }
509 }
510 self
511 }
512 _ => panic!("Unexpected event!"),
513 }
514 }
515 }
516
517 // Return the decoder to the awaiting resolution state.
518 match self.capture_queue {
519 CaptureQueue::AwaitingResolution { .. } => self,
520 CaptureQueue::Decoding { capture_queue, .. } => Self {
521 capture_queue: CaptureQueue::AwaitingResolution {
522 capture_queue: {
523 capture_queue.stream_off().unwrap();
524 capture_queue.free_buffers().unwrap().queue
525 },
526 },
527 poller: {
528 let mut poller = self.poller;
529 poller.disable_event(DeviceEvent::CaptureReady).unwrap();
530 poller.enable_event(DeviceEvent::V4L2Event).unwrap();
531 poller.remove_waker(CAPTURE_READY).unwrap();
532 poller
533 },
534 ..self
535 },
536 }
537 }
538 }
539