xref: /aosp_15_r20/external/crosvm/devices/src/virtio/video/worker.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Worker that runs in a virtio-video thread.
6 
7 use std::collections::VecDeque;
8 use std::time::Duration;
9 
10 use base::clone_descriptor;
11 use base::error;
12 use base::info;
13 use base::Event;
14 use base::WaitContext;
15 use cros_async::select3;
16 use cros_async::AsyncWrapper;
17 use cros_async::EventAsync;
18 use cros_async::Executor;
19 use cros_async::SelectResult;
20 use futures::FutureExt;
21 
22 use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap;
23 use crate::virtio::video::command::QueueType;
24 use crate::virtio::video::command::VideoCmd;
25 use crate::virtio::video::device::AsyncCmdResponse;
26 use crate::virtio::video::device::AsyncCmdTag;
27 use crate::virtio::video::device::Device;
28 use crate::virtio::video::device::Token;
29 use crate::virtio::video::device::VideoCmdResponseType;
30 use crate::virtio::video::device::VideoEvtResponseType;
31 use crate::virtio::video::event;
32 use crate::virtio::video::event::EvtType;
33 use crate::virtio::video::event::VideoEvt;
34 use crate::virtio::video::response;
35 use crate::virtio::video::response::Response;
36 use crate::virtio::video::Error;
37 use crate::virtio::video::Result;
38 use crate::virtio::DescriptorChain;
39 use crate::virtio::Queue;
40 
41 /// Worker that takes care of running the virtio video device.
42 pub struct Worker {
43     /// VirtIO queue for Command queue
44     cmd_queue: Queue,
45     /// VirtIO queue for Event queue
46     event_queue: Queue,
47     /// Stores descriptor chains in which responses for asynchronous commands will be written
48     desc_map: AsyncCmdDescMap,
49 }
50 
51 /// Pair of a descriptor chain and a response to be written.
52 type WritableResp = (DescriptorChain, response::CmdResponse);
53 
54 impl Worker {
new(cmd_queue: Queue, event_queue: Queue) -> Self55     pub fn new(cmd_queue: Queue, event_queue: Queue) -> Self {
56         Self {
57             cmd_queue,
58             event_queue,
59             desc_map: Default::default(),
60         }
61     }
62 
63     /// Writes responses into the command queue.
write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()>64     fn write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()> {
65         if responses.is_empty() {
66             return Ok(());
67         }
68         while let Some((mut desc, response)) = responses.pop_front() {
69             if let Err(e) = response.write(&mut desc.writer) {
70                 error!(
71                     "failed to write a command response for {:?}: {}",
72                     response, e
73                 );
74             }
75             let len = desc.writer.bytes_written() as u32;
76             self.cmd_queue.add_used(desc, len);
77         }
78         self.cmd_queue.trigger_interrupt();
79         Ok(())
80     }
81 
82     /// Writes a `VideoEvt` into the event queue.
write_event(&mut self, event: event::VideoEvt) -> Result<()>83     fn write_event(&mut self, event: event::VideoEvt) -> Result<()> {
84         let mut desc = self
85             .event_queue
86             .pop()
87             .ok_or(Error::DescriptorNotAvailable)?;
88 
89         event
90             .write(&mut desc.writer)
91             .map_err(|error| Error::WriteEventFailure { event, error })?;
92         let len = desc.writer.bytes_written() as u32;
93         self.event_queue.add_used(desc, len);
94         self.event_queue.trigger_interrupt();
95         Ok(())
96     }
97 
98     /// Writes the `event_responses` into the command queue or the event queue according to
99     /// each response's type.
100     ///
101     /// # Arguments
102     ///
103     /// * `event_responses` - Responses to write
104     /// * `stream_id` - Stream session ID of the responses
write_event_responses( &mut self, event_responses: Vec<VideoEvtResponseType>, stream_id: u32, ) -> Result<()>105     fn write_event_responses(
106         &mut self,
107         event_responses: Vec<VideoEvtResponseType>,
108         stream_id: u32,
109     ) -> Result<()> {
110         let mut responses: VecDeque<WritableResp> = Default::default();
111         for event_response in event_responses {
112             match event_response {
113                 VideoEvtResponseType::AsyncCmd(async_response) => {
114                     let AsyncCmdResponse {
115                         tag,
116                         response: cmd_result,
117                     } = async_response;
118                     match self.desc_map.remove(&tag) {
119                         Some(desc) => {
120                             let cmd_response = match cmd_result {
121                                 Ok(r) => r,
122                                 Err(e) => {
123                                     error!("returning async error response: {}", &e);
124                                     e.into()
125                                 }
126                             };
127                             responses.push_back((desc, cmd_response))
128                         }
129                         None => match tag {
130                             // TODO(b/153406792): Drain is cancelled by clearing either of the
131                             // stream's queues. To work around a limitation in the VDA api, the
132                             // output queue is cleared synchronously without going through VDA.
133                             // Because of this, the cancellation response from VDA for the
134                             // input queue might fail to find the drain's AsyncCmdTag.
135                             AsyncCmdTag::Drain { stream_id: _ } => {
136                                 info!("ignoring unknown drain response");
137                             }
138                             _ => {
139                                 error!("dropping response for an untracked command: {:?}", tag);
140                             }
141                         },
142                     }
143                 }
144                 VideoEvtResponseType::Event(evt) => {
145                     self.write_event(evt)?;
146                 }
147             }
148         }
149 
150         if let Err(e) = self.write_responses(&mut responses) {
151             error!("Failed to write event responses: {:?}", e);
152             // Ignore result of write_event for a fatal error.
153             let _ = self.write_event(VideoEvt {
154                 typ: EvtType::Error,
155                 stream_id,
156             });
157             return Err(e);
158         }
159 
160         Ok(())
161     }
162 
163     /// Handles a `DescriptorChain` value sent via the command queue and returns a `VecDeque`
164     /// of `WritableResp` to be sent to the guest.
165     ///
166     /// # Arguments
167     ///
168     /// * `device` - Instance of backend device
169     /// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to
170     ///   `wait_ctx`
171     /// * `desc` - `DescriptorChain` to handle
handle_command_desc( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, mut desc: DescriptorChain, ) -> Result<VecDeque<WritableResp>>172     fn handle_command_desc(
173         &mut self,
174         device: &mut dyn Device,
175         wait_ctx: &WaitContext<Token>,
176         mut desc: DescriptorChain,
177     ) -> Result<VecDeque<WritableResp>> {
178         let mut responses: VecDeque<WritableResp> = Default::default();
179         let cmd = VideoCmd::from_reader(&mut desc.reader).map_err(Error::ReadFailure)?;
180 
181         // If a destruction command comes, cancel pending requests.
182         // TODO(b/161774071): Allow `process_cmd` to return multiple responses and move this
183         // into encoder/decoder.
184         let async_responses = match cmd {
185             VideoCmd::ResourceDestroyAll {
186                 stream_id,
187                 queue_type,
188             } => self
189                 .desc_map
190                 .create_cancellation_responses(&stream_id, Some(queue_type), None),
191             VideoCmd::StreamDestroy { stream_id } => self
192                 .desc_map
193                 .create_cancellation_responses(&stream_id, None, None),
194             VideoCmd::QueueClear {
195                 stream_id,
196                 queue_type: QueueType::Output,
197             } => {
198                 // TODO(b/153406792): Due to a workaround for a limitation in the VDA api,
199                 // clearing the output queue doesn't go through the same Async path as clearing
200                 // the input queue. However, we still need to cancel the pending resources.
201                 self.desc_map.create_cancellation_responses(
202                     &stream_id,
203                     Some(QueueType::Output),
204                     None,
205                 )
206             }
207             _ => Default::default(),
208         };
209         for async_response in async_responses {
210             let AsyncCmdResponse {
211                 tag,
212                 response: cmd_result,
213             } = async_response;
214             let destroy_response = match cmd_result {
215                 Ok(r) => r,
216                 Err(e) => {
217                     error!("returning async error response: {}", &e);
218                     e.into()
219                 }
220             };
221             match self.desc_map.remove(&tag) {
222                 Some(destroy_desc) => {
223                     responses.push_back((destroy_desc, destroy_response));
224                 }
225                 None => error!("dropping response for an untracked command: {:?}", tag),
226             }
227         }
228 
229         // Process the command by the device.
230         let (cmd_response, event_responses_with_id) = device.process_cmd(cmd, wait_ctx);
231         match cmd_response {
232             VideoCmdResponseType::Sync(r) => {
233                 responses.push_back((desc, r));
234             }
235             VideoCmdResponseType::Async(tag) => {
236                 // If the command expects an asynchronous response,
237                 // store `desc` to use it after the back-end device notifies the
238                 // completion.
239                 self.desc_map.insert(tag, desc);
240             }
241         }
242         if let Some((stream_id, event_responses)) = event_responses_with_id {
243             self.write_event_responses(event_responses, stream_id)?;
244         }
245 
246         Ok(responses)
247     }
248 
249     /// Handles each command in the command queue.
250     ///
251     /// # Arguments
252     ///
253     /// * `device` - Instance of backend device
254     /// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to
255     ///   `wait_ctx`
handle_command_queue( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, ) -> Result<()>256     fn handle_command_queue(
257         &mut self,
258         device: &mut dyn Device,
259         wait_ctx: &WaitContext<Token>,
260     ) -> Result<()> {
261         while let Some(desc) = self.cmd_queue.pop() {
262             let mut resps = self.handle_command_desc(device, wait_ctx, desc)?;
263             self.write_responses(&mut resps)?;
264         }
265         Ok(())
266     }
267 
268     /// Handles an event notified via an event.
269     ///
270     /// # Arguments
271     ///
272     /// * `device` - Instance of backend device
273     /// * `stream_id` - Stream session ID of the event
274     /// * `wait_ctx` - `device` may register a new `Token::Buffer` for a new stream session to
275     ///   `wait_ctx`
handle_event( &mut self, device: &mut dyn Device, stream_id: u32, wait_ctx: &WaitContext<Token>, ) -> Result<()>276     fn handle_event(
277         &mut self,
278         device: &mut dyn Device,
279         stream_id: u32,
280         wait_ctx: &WaitContext<Token>,
281     ) -> Result<()> {
282         if let Some(event_responses) = device.process_event(&mut self.desc_map, stream_id, wait_ctx)
283         {
284             self.write_event_responses(event_responses, stream_id)?;
285         }
286         Ok(())
287     }
288 
289     /// Handles a completed buffer barrier.
290     ///
291     /// # Arguments
292     ///
293     /// * `device` - Instance of backend device
294     /// * `stream_id` - Stream session ID of the event
295     /// * `wait_ctx` - `device` may deregister the completed `Token::BufferBarrier` from `wait_ctx`.
handle_buffer_barrier( &mut self, device: &mut dyn Device, stream_id: u32, wait_ctx: &WaitContext<Token>, ) -> Result<()>296     fn handle_buffer_barrier(
297         &mut self,
298         device: &mut dyn Device,
299         stream_id: u32,
300         wait_ctx: &WaitContext<Token>,
301     ) -> Result<()> {
302         if let Some(event_responses) = device.process_buffer_barrier(stream_id, wait_ctx) {
303             self.write_event_responses(event_responses, stream_id)?;
304         }
305         Ok(())
306     }
307 
308     /// Runs the video device virtio queues in a blocking way.
309     ///
310     /// # Arguments
311     ///
312     /// * `device` - Instance of backend device
313     /// * `kill_evt` - `Event` notified to make `run` stop and return
run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()>314     pub fn run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()> {
315         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
316             (self.cmd_queue.event(), Token::CmdQueue),
317             (self.event_queue.event(), Token::EventQueue),
318             (kill_evt, Token::Kill),
319         ])
320         .and_then(|wc| {
321             // resampling event exists per-PCI-INTx basis, so the two queues have the same event.
322             // Thus, checking only cmd_queue_interrupt suffices.
323             if let Some(resample_evt) = self.cmd_queue.interrupt().get_resample_evt() {
324                 wc.add(resample_evt, Token::InterruptResample)?;
325             }
326             Ok(wc)
327         })
328         .map_err(Error::WaitContextCreationFailed)?;
329 
330         loop {
331             let wait_events = wait_ctx.wait().map_err(Error::WaitError)?;
332 
333             for wait_event in wait_events.iter().filter(|e| e.is_readable) {
334                 match wait_event.token {
335                     Token::CmdQueue => {
336                         let _ = self.cmd_queue.event().wait();
337                         self.handle_command_queue(device.as_mut(), &wait_ctx)?;
338                     }
339                     Token::EventQueue => {
340                         let _ = self.event_queue.event().wait();
341                     }
342                     Token::Event { id } => {
343                         self.handle_event(device.as_mut(), id, &wait_ctx)?;
344                     }
345                     Token::BufferBarrier { id } => {
346                         self.handle_buffer_barrier(device.as_mut(), id, &wait_ctx)?;
347                     }
348                     Token::InterruptResample => {
349                         // Clear the event. `expect` is ok since the token fires if and only if
350                         // resample exists. resampling event exists per-PCI-INTx basis, so the
351                         // two queues have the same event.
352                         let _ = self
353                             .cmd_queue
354                             .interrupt()
355                             .get_resample_evt()
356                             .expect("resample event for the command queue doesn't exist")
357                             .wait();
358                         self.cmd_queue.interrupt().do_interrupt_resample();
359                     }
360                     Token::Kill => return Ok(()),
361                 }
362             }
363         }
364     }
365 
366     /// Runs the video device virtio queues asynchronously.
367     ///
368     /// # Arguments
369     ///
370     /// * `device` - Instance of backend device
371     /// * `ex` - Instance of `Executor` of asynchronous operations
372     /// * `cmd_evt` - Driver-to-device kick event for the command queue
373     /// * `event_evt` - Driver-to-device kick event for the event queue
374     #[allow(dead_code)]
run_async( mut self, mut device: Box<dyn Device>, ex: Executor, cmd_evt: Event, event_evt: Event, ) -> Result<()>375     pub async fn run_async(
376         mut self,
377         mut device: Box<dyn Device>,
378         ex: Executor,
379         cmd_evt: Event,
380         event_evt: Event,
381     ) -> Result<()> {
382         let cmd_queue_evt =
383             EventAsync::new(cmd_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
384         let event_queue_evt =
385             EventAsync::new(event_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
386 
387         // WaitContext to wait for the response from the encoder/decoder device.
388         let device_wait_ctx = WaitContext::new().map_err(Error::WaitContextCreationFailed)?;
389         let device_evt = ex
390             .async_from(AsyncWrapper::new(
391                 clone_descriptor(&device_wait_ctx).map_err(Error::CloneDescriptorFailed)?,
392             ))
393             .map_err(Error::EventAsyncCreationFailed)?;
394 
395         loop {
396             let (
397                 cmd_queue_evt,
398                 device_evt,
399                 // Ignore driver-to-device kicks since the event queue is write-only for a device.
400                 _event_queue_evt,
401             ) = select3(
402                 cmd_queue_evt.next_val().boxed_local(),
403                 device_evt.wait_readable().boxed_local(),
404                 event_queue_evt.next_val().boxed_local(),
405             )
406             .await;
407 
408             if let SelectResult::Finished(_) = cmd_queue_evt {
409                 self.handle_command_queue(device.as_mut(), &device_wait_ctx)?;
410             }
411 
412             if let SelectResult::Finished(_) = device_evt {
413                 let device_events = match device_wait_ctx.wait_timeout(Duration::from_secs(0)) {
414                     Ok(device_events) => device_events,
415                     Err(_) => {
416                         error!("failed to read a device event");
417                         continue;
418                     }
419                 };
420                 for device_event in device_events {
421                     // A Device must trigger only Token::Event. See [`Device::process_cmd()`].
422                     if let Token::Event { id } = device_event.token {
423                         self.handle_event(device.as_mut(), id, &device_wait_ctx)?;
424                     } else {
425                         error!(
426                             "invalid event is triggered by a device {:?}",
427                             device_event.token
428                         );
429                     }
430                 }
431             }
432         }
433     }
434 }
435