1 use crate::{
2     decoder::{
3         stateful::{CaptureThreadResponse, DecoderCommand, DecoderEvent, DrainError},
4         DecoderEventCallback, FormatChangedCallback, FormatChangedReply,
5     },
6     device::{
7         poller::{DeviceEvent, PollEvent, Poller, Waker},
8         queue::{
9             self,
10             direction::Capture,
11             handles_provider::HandlesProvider,
12             qbuf::{
13                 get_free::GetFreeCaptureBuffer, get_indexed::GetCaptureBufferByIndex,
14                 CaptureQueueable,
15             },
16             BuffersAllocated, Queue, QueueInit,
17         },
18         AllocatedQueue, Device, Stream, TryDequeue,
19     },
20     ioctl::{self, SelectionTarget},
21 };
22 
23 use std::{
24     io,
25     sync::{mpsc, Arc},
26     task::Wake,
27 };
28 
29 use log::{debug, error, trace, warn};
30 use thiserror::Error;
31 
32 /// Check if `device` has a dynamic resolution change event pending.
33 ///
34 /// Dequeues all pending V4L2 events and returns `true` if a
35 /// SRC_CHANGE_EVENT (indicating a format change on the CAPTURE queue) was
36 /// detected. This consumes all the event, meaning that if this method
37 /// returned `true` once it will return `false` until a new resolution
38 /// change happens in the stream.
is_drc_event_pending(device: &Device) -> Result<bool, ioctl::DqEventError>39 fn is_drc_event_pending(device: &Device) -> Result<bool, ioctl::DqEventError> {
40     let mut drc_pending = false;
41 
42     loop {
43         // TODO what if we used an iterator here?
44         let event = match ioctl::dqevent(device) {
45             Ok(event) => event,
46             Err(ioctl::DqEventError::NotReady) => return Ok(drc_pending),
47             Err(e) => return Err(e),
48         };
49 
50         match event {
51             ioctl::Event::SrcChangeEvent(changes) => {
52                 if changes.contains(ioctl::SrcChanges::RESOLUTION) {
53                     debug!("Received resolution change event");
54                     drc_pending = true;
55                 }
56             }
57             ioctl::Event::Eos => {
58                 debug!("Received EOS event");
59             }
60         }
61     }
62 }
63 
64 enum CaptureQueue<P: HandlesProvider> {
65     AwaitingResolution {
66         capture_queue: Queue<Capture, QueueInit>,
67     },
68     Decoding {
69         capture_queue: Queue<Capture, BuffersAllocated<P::HandleType>>,
70         provider: P,
71         cap_buffer_waker: Arc<Waker>,
72         // TODO not super elegant...
73         blocking_drain_in_progress: bool,
74     },
75 }
76 
77 pub(super) struct CaptureThread<P, DecoderEventCb, FormatChangedCb>
78 where
79     P: HandlesProvider,
80     DecoderEventCb: DecoderEventCallback<P>,
81     FormatChangedCb: FormatChangedCallback<P>,
82 {
83     device: Arc<Device>,
84     capture_queue: CaptureQueue<P>,
85     pub(super) poller: Poller,
86 
87     event_cb: DecoderEventCb,
88     set_capture_format_cb: FormatChangedCb,
89 
90     // Waker signaled when the main thread has commands pending for us.
91     pub(super) command_waker: Arc<Waker>,
92     // Receiver we read commands from when `command_waker` is signaled.
93     command_receiver: mpsc::Receiver<DecoderCommand>,
94     // Sender we use to send status messages after receiving commands from the
95     // main thread.
96     response_sender: mpsc::Sender<CaptureThreadResponse>,
97 }
98 
99 #[derive(Debug, Error)]
100 enum UpdateCaptureError {
101     #[error("error while enabling poller events: {0}")]
102     PollerEvents(io::Error),
103     #[error("error while removing CAPTURE waker: {0}")]
104     RemoveWaker(io::Error),
105     #[error("error while stopping CAPTURE queue: {0}")]
106     Streamoff(#[from] ioctl::StreamOffError),
107     #[error("error while freeing CAPTURE buffers: {0}")]
108     FreeBuffers(#[from] ioctl::ReqbufsError),
109     #[error("error while obtaining CAPTURE format: {0}")]
110     GFmt(#[from] ioctl::GFmtError),
111     #[error("error while obtaining selection target from CAPTURE queue: {0}")]
112     GSelection(#[from] ioctl::GSelectionError),
113     #[error("error while running the CAPTURE format callback: {0}")]
114     Callback(#[from] anyhow::Error),
115     #[error("error while requesting CAPTURE buffers: {0}")]
116     RequestBuffers(#[from] queue::RequestBuffersError),
117     #[error("error while adding the CAPTURE buffer waker: {0}")]
118     AddWaker(io::Error),
119     #[error("error while streaming CAPTURE queue: {0}")]
120     StreamOn(#[from] ioctl::StreamOnError),
121 }
122 
123 const CAPTURE_READY: u32 = 1;
124 const COMMAND_WAITING: u32 = 2;
125 
126 #[derive(Debug, Error)]
127 enum ProcessEventsError {
128     #[error("error while dequeueing event")]
129     DqEvent(#[from] ioctl::DqEventError),
130     #[error("error while requesting buffers")]
131     RequestBuffers(#[from] queue::RequestBuffersError),
132     #[error("error while updating CAPTURE format")]
133     UpdateCapture(#[from] UpdateCaptureError),
134 }
135 
136 impl<P, DecoderEventCb, FormatChangedCb> CaptureThread<P, DecoderEventCb, FormatChangedCb>
137 where
138     P: HandlesProvider,
139     DecoderEventCb: DecoderEventCallback<P>,
140     FormatChangedCb: FormatChangedCallback<P>,
141     for<'a> Queue<Capture, BuffersAllocated<P::HandleType>>:
142         GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>,
143 {
new( device: &Arc<Device>, capture_queue: Queue<Capture, QueueInit>, event_cb: DecoderEventCb, set_capture_format_cb: FormatChangedCb, command_receiver: mpsc::Receiver<DecoderCommand>, response_sender: mpsc::Sender<CaptureThreadResponse>, ) -> io::Result<Self>144     pub(super) fn new(
145         device: &Arc<Device>,
146         capture_queue: Queue<Capture, QueueInit>,
147         event_cb: DecoderEventCb,
148         set_capture_format_cb: FormatChangedCb,
149         command_receiver: mpsc::Receiver<DecoderCommand>,
150         response_sender: mpsc::Sender<CaptureThreadResponse>,
151     ) -> io::Result<Self> {
152         // Start by only listening to V4L2 events in order to catch the initial
153         // resolution change, and to the stop waker in case the user had a
154         // change of heart about decoding something now.
155         let mut poller = Poller::new(Arc::clone(device))?;
156         poller.enable_event(DeviceEvent::V4L2Event)?;
157         let command_waker = poller.add_waker(COMMAND_WAITING)?;
158 
159         let decoder_thread = CaptureThread {
160             device: Arc::clone(device),
161             capture_queue: CaptureQueue::AwaitingResolution { capture_queue },
162             poller,
163             event_cb,
164             set_capture_format_cb,
165             command_waker,
166             command_receiver,
167             response_sender,
168         };
169 
170         Ok(decoder_thread)
171     }
172 
send_response(&self, response: CaptureThreadResponse)173     fn send_response(&self, response: CaptureThreadResponse) {
174         trace!("Sending response: {:?}", response);
175 
176         self.response_sender.send(response).unwrap();
177     }
178 
drain(&mut self, blocking: bool)179     fn drain(&mut self, blocking: bool) {
180         trace!("Processing Drain({}) command", blocking);
181         let response = match &mut self.capture_queue {
182             // We cannot initiate the flush sequence before receiving the initial
183             // resolution.
184             CaptureQueue::AwaitingResolution { .. } => {
185                 Some(CaptureThreadResponse::DrainDone(Err(DrainError::TryAgain)))
186             }
187             CaptureQueue::Decoding {
188                 blocking_drain_in_progress,
189                 ..
190             } => {
191                 // We can receive the LAST buffer, send the STOP command
192                 // and exit the loop once the buffer with the LAST tag is received.
193                 ioctl::decoder_cmd::<_, ()>(&*self.device, ioctl::DecoderCmd::stop()).unwrap();
194                 if blocking {
195                     // If we are blocking, we will send the answer when the drain
196                     // is completed.
197                     *blocking_drain_in_progress = true;
198                     None
199                 } else {
200                     // If not blocking, send the response now so the client can keep going.
201                     Some(CaptureThreadResponse::DrainDone(Ok(false)))
202                 }
203             }
204         };
205 
206         if let Some(response) = response {
207             self.send_response(response);
208         }
209     }
210 
flush(&mut self)211     fn flush(&mut self) {
212         trace!("Processing flush command");
213         match &mut self.capture_queue {
214             CaptureQueue::AwaitingResolution { .. } => {}
215             CaptureQueue::Decoding {
216                 capture_queue,
217                 blocking_drain_in_progress,
218                 ..
219             } => {
220                 // Stream the capture queue off and back on, dropping any queued
221                 // buffer, and making the decoder ready to work again if it was
222                 // halted.
223                 capture_queue.stream_off().unwrap();
224                 capture_queue.stream_on().unwrap();
225                 *blocking_drain_in_progress = false;
226             }
227         }
228 
229         self.send_response(CaptureThreadResponse::FlushDone(Ok(())));
230         self.enqueue_capture_buffers()
231     }
232 
enqueue_capture_buffers(&mut self)233     fn enqueue_capture_buffers(&mut self) {
234         trace!("Queueing available CAPTURE buffers");
235         let (capture_queue, provider, cap_buffer_waker) = match &mut self.capture_queue {
236             // Capture queue is not set up yet, no buffers to queue.
237             CaptureQueue::AwaitingResolution { .. } => return,
238             CaptureQueue::Decoding {
239                 capture_queue,
240                 provider,
241                 cap_buffer_waker,
242                 ..
243             } => (capture_queue, provider, cap_buffer_waker),
244         };
245 
246         // Requeue all available CAPTURE buffers.
247         'enqueue: while let Some(handles) = provider.get_handles(cap_buffer_waker) {
248             // TODO potential problem: the handles will be dropped if no V4L2 buffer
249             // is available. There is no guarantee that the provider will get them back
250             // in this case (e.g. with the C FFI).
251             let buffer = match provider.get_suitable_buffer_for(&handles, capture_queue) {
252                 Ok(buffer) => buffer,
253                 // It is possible that we run out of V4L2 buffers if there are more handles than
254                 // buffers allocated. One example of this scenario is the `MmapProvider` which has
255                 // an infinite number of handles. Break out of the loop when this happens - we will
256                 // be called again the next time a CAPTURE buffer becomes available.
257                 Err(queue::handles_provider::GetSuitableBufferError::TryGetFree(
258                     queue::qbuf::get_free::GetFreeBufferError::NoFreeBuffer,
259                 )) => {
260                     break 'enqueue;
261                 }
262                 Err(e) => {
263                     error!("Could not find suitable buffer for handles: {}", e);
264                     warn!("Handles potentially lost due to no V4L2 buffer being available");
265                     break 'enqueue;
266                 }
267             };
268             match buffer.queue_with_handles(handles) {
269                 Ok(()) => (),
270                 Err(e) => error!("Error while queueing CAPTURE buffer: {}", e),
271             }
272         }
273     }
274 
process_v4l2_event(mut self) -> Self275     fn process_v4l2_event(mut self) -> Self {
276         trace!("Processing V4L2 event");
277         match self.capture_queue {
278             CaptureQueue::AwaitingResolution { .. } => {
279                 if is_drc_event_pending(&self.device).unwrap() {
280                     self = self.update_capture_format().unwrap()
281                 }
282             }
283             CaptureQueue::Decoding { .. } => unreachable!(),
284         }
285 
286         self
287     }
288 
update_capture_format(mut self) -> Result<Self, UpdateCaptureError>289     fn update_capture_format(mut self) -> Result<Self, UpdateCaptureError> {
290         debug!("Updating CAPTURE format");
291         // First reset the capture queue to the `Init` state if needed.
292         let mut capture_queue = match self.capture_queue {
293             // Initial resolution
294             CaptureQueue::AwaitingResolution { capture_queue } => {
295                 // Stop listening to V4L2 events. We will check them when we get
296                 // a buffer with the LAST flag.
297                 self.poller
298                     .disable_event(DeviceEvent::V4L2Event)
299                     .map_err(Into::<io::Error>::into)
300                     .map_err(UpdateCaptureError::PollerEvents)?;
301                 // Listen to CAPTURE buffers being ready to dequeue, as we will
302                 // be streaming soon.
303                 self.poller
304                     .enable_event(DeviceEvent::CaptureReady)
305                     .map_err(Into::<io::Error>::into)
306                     .map_err(UpdateCaptureError::PollerEvents)?;
307                 capture_queue
308             }
309             // Dynamic resolution change
310             CaptureQueue::Decoding { capture_queue, .. } => {
311                 // Remove the waker for the previous buffers pool, as we will
312                 // get a new set of buffers.
313                 self.poller
314                     .remove_waker(CAPTURE_READY)
315                     .map_err(UpdateCaptureError::RemoveWaker)?;
316                 // Deallocate the queue and return it to the `Init` state. Good
317                 // as new!
318                 capture_queue.stream_off()?;
319                 capture_queue.free_buffers()?.queue
320             }
321         };
322 
323         // Now get the parameters of the new format and build our new CAPTURE
324         // queue.
325 
326         // TODO use the proper control to get the right value.
327         let min_num_buffers = 4usize;
328         debug!("Stream requires {} capture buffers", min_num_buffers);
329 
330         let visible_rect = capture_queue.get_selection(SelectionTarget::Compose)?;
331         debug!(
332             "Visible rectangle: ({}, {}), {}x{}",
333             visible_rect.left, visible_rect.top, visible_rect.width, visible_rect.height
334         );
335 
336         // Let the client adjust the new format and give us the handles provider.
337         let FormatChangedReply {
338             provider,
339             mem_type,
340             num_buffers,
341         } = (self.set_capture_format_cb)(
342             capture_queue.change_format()?,
343             visible_rect,
344             min_num_buffers,
345         )?;
346 
347         debug!("Client requires {} capture buffers", num_buffers);
348 
349         // Allocate the new CAPTURE buffers and get ourselves a new waker for
350         // returning buffers.
351         let capture_queue =
352             capture_queue.request_buffers_generic::<P::HandleType>(mem_type, num_buffers as u32)?;
353         let cap_buffer_waker = self
354             .poller
355             .add_waker(CAPTURE_READY)
356             .map_err(UpdateCaptureError::AddWaker)?;
357 
358         // Ready to decode - signal the waker so we immediately enqueue buffers
359         // and start streaming.
360         cap_buffer_waker.wake_by_ref();
361         capture_queue.stream_on()?;
362 
363         Ok(Self {
364             capture_queue: CaptureQueue::Decoding {
365                 capture_queue,
366                 provider,
367                 cap_buffer_waker,
368                 blocking_drain_in_progress: false,
369             },
370             ..self
371         })
372     }
373 
374     /// Attempt to dequeue and process a single CAPTURE buffer.
375     ///
376     /// If a buffer can be dequeued, then the following processing takes place:
377     /// * Invoke the event callback with a `FrameDecoded` event containing the
378     ///   dequeued buffer,
379     /// * If the buffer has the LAST flag set:
380     ///   * If a resolution change event is pending, start the resolution change
381     ///     procedure,
382     ///   * If a resolution change event is not pending, invoke the event
383     ///     callback with an 'EndOfStream` event,
384     ///   * If a blocking drain was in progress, complete it.
dequeue_capture_buffer(mut self) -> Self385     fn dequeue_capture_buffer(mut self) -> Self {
386         trace!("Dequeueing decoded CAPTURE buffers");
387         let (capture_queue, cap_buffer_waker, blocking_drain_in_progress) =
388             match &mut self.capture_queue {
389                 CaptureQueue::AwaitingResolution { .. } => unreachable!(),
390                 CaptureQueue::Decoding {
391                     capture_queue,
392                     cap_buffer_waker,
393                     blocking_drain_in_progress,
394                     ..
395                 } => (capture_queue, cap_buffer_waker, blocking_drain_in_progress),
396             };
397 
398         let mut cap_buf = match capture_queue.try_dequeue() {
399             Ok(cap_buf) => cap_buf,
400             Err(e) => {
401                 warn!(
402                     "Expected a CAPTURE buffer but none available, possible driver bug: {}",
403                     e
404                 );
405                 return self;
406             }
407         };
408 
409         let is_last = cap_buf.data.is_last();
410 
411         // Add a drop callback to the dequeued buffer so we
412         // re-queue it as soon as it is dropped.
413         let cap_waker = Arc::clone(cap_buffer_waker);
414         cap_buf.add_drop_callback(move |_dqbuf| {
415             // Intentionally ignore the result here.
416             cap_waker.wake();
417         });
418 
419         // Pass buffers to the client
420         (self.event_cb)(DecoderEvent::FrameDecoded(cap_buf));
421 
422         if is_last {
423             debug!("CAPTURE buffer marked with LAST flag");
424             if is_drc_event_pending(&self.device).unwrap() {
425                 debug!("DRC event pending, updating CAPTURE format");
426                 self = self.update_capture_format().unwrap()
427             }
428             // No DRC event pending, this is the end of the stream.
429             // We need to stop and restart the CAPTURE queue, otherwise
430             // it will keep signaling buffers as ready and dequeueing
431             // them will return `EPIPE`.
432             else {
433                 debug!("No DRC event pending, restarting capture queue");
434                 // We are supposed to be able to run the START command
435                 // instead, but with vicodec the CAPTURE queue reports
436                 // as ready in subsequent polls() and DQBUF returns
437                 // -EPIPE...
438                 capture_queue.stream_off().unwrap();
439                 capture_queue.stream_on().unwrap();
440                 (self.event_cb)(DecoderEvent::EndOfStream);
441                 if *blocking_drain_in_progress {
442                     debug!("Signaling end of blocking drain");
443                     *blocking_drain_in_progress = false;
444                     self.send_response(CaptureThreadResponse::DrainDone(Ok(true)));
445                 }
446             }
447         }
448 
449         self
450     }
451 
run(mut self) -> Self452     pub(super) fn run(mut self) -> Self {
453         'mainloop: loop {
454             if let CaptureQueue::Decoding { capture_queue, .. } = &self.capture_queue {
455                 match capture_queue.num_queued_buffers() {
456                     // If there are no buffers on the CAPTURE queue, poll() will return
457                     // immediately with EPOLLERR and we would loop indefinitely.
458                     // Prevent this by temporarily disabling polling the CAPTURE queue
459                     // in such cases.
460                     0 => {
461                         self.poller
462                             .disable_event(DeviceEvent::CaptureReady)
463                             .unwrap();
464                     }
465                     // If device polling was disabled and we have buffers queued, we
466                     // can reenable it as poll will now wait for a CAPTURE buffer to
467                     // be ready for dequeue.
468                     _ => {
469                         self.poller.enable_event(DeviceEvent::CaptureReady).unwrap();
470                     }
471                 }
472             }
473 
474             trace!("Polling...");
475             let events = match self.poller.poll(None) {
476                 Ok(events) => events,
477                 Err(e) => {
478                     error!("Polling failure, exiting capture thread: {}", e);
479                     break 'mainloop;
480                 }
481             };
482             for event in events {
483                 self = match event {
484                     PollEvent::Device(DeviceEvent::V4L2Event) => self.process_v4l2_event(),
485                     PollEvent::Device(DeviceEvent::CaptureReady) => self.dequeue_capture_buffer(),
486                     PollEvent::Waker(CAPTURE_READY) => {
487                         self.enqueue_capture_buffers();
488                         self
489                     }
490                     PollEvent::Waker(COMMAND_WAITING) => {
491                         loop {
492                             let command =
493                                 match self.command_receiver.recv_timeout(Default::default()) {
494                                     Ok(command) => command,
495                                     Err(mpsc::RecvTimeoutError::Timeout) => break,
496                                     Err(e) => {
497                                         error!("Error while reading decoder command: {}", e);
498                                         break;
499                                     }
500                                 };
501                             match command {
502                                 DecoderCommand::Drain(blocking) => self.drain(blocking),
503                                 DecoderCommand::Flush => self.flush(),
504                                 DecoderCommand::Stop => {
505                                     trace!("Processing stop command");
506                                     break 'mainloop;
507                                 }
508                             }
509                         }
510                         self
511                     }
512                     _ => panic!("Unexpected event!"),
513                 }
514             }
515         }
516 
517         // Return the decoder to the awaiting resolution state.
518         match self.capture_queue {
519             CaptureQueue::AwaitingResolution { .. } => self,
520             CaptureQueue::Decoding { capture_queue, .. } => Self {
521                 capture_queue: CaptureQueue::AwaitingResolution {
522                     capture_queue: {
523                         capture_queue.stream_off().unwrap();
524                         capture_queue.free_buffers().unwrap().queue
525                     },
526                 },
527                 poller: {
528                     let mut poller = self.poller;
529                     poller.disable_event(DeviceEvent::CaptureReady).unwrap();
530                     poller.enable_event(DeviceEvent::V4L2Event).unwrap();
531                     poller.remove_waker(CAPTURE_READY).unwrap();
532                     poller
533                 },
534                 ..self
535             },
536         }
537     }
538 }
539