1 // Copyright 2020 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 //! Worker that runs in a virtio-video thread. 6 7 use std::collections::VecDeque; 8 use std::time::Duration; 9 10 use base::clone_descriptor; 11 use base::error; 12 use base::info; 13 use base::Event; 14 use base::WaitContext; 15 use cros_async::select3; 16 use cros_async::AsyncWrapper; 17 use cros_async::EventAsync; 18 use cros_async::Executor; 19 use cros_async::SelectResult; 20 use futures::FutureExt; 21 22 use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap; 23 use crate::virtio::video::command::QueueType; 24 use crate::virtio::video::command::VideoCmd; 25 use crate::virtio::video::device::AsyncCmdResponse; 26 use crate::virtio::video::device::AsyncCmdTag; 27 use crate::virtio::video::device::Device; 28 use crate::virtio::video::device::Token; 29 use crate::virtio::video::device::VideoCmdResponseType; 30 use crate::virtio::video::device::VideoEvtResponseType; 31 use crate::virtio::video::event; 32 use crate::virtio::video::event::EvtType; 33 use crate::virtio::video::event::VideoEvt; 34 use crate::virtio::video::response; 35 use crate::virtio::video::response::Response; 36 use crate::virtio::video::Error; 37 use crate::virtio::video::Result; 38 use crate::virtio::DescriptorChain; 39 use crate::virtio::Queue; 40 41 /// Worker that takes care of running the virtio video device. 42 pub struct Worker { 43 /// VirtIO queue for Command queue 44 cmd_queue: Queue, 45 /// VirtIO queue for Event queue 46 event_queue: Queue, 47 /// Stores descriptor chains in which responses for asynchronous commands will be written 48 desc_map: AsyncCmdDescMap, 49 } 50 51 /// Pair of a descriptor chain and a response to be written. 52 type WritableResp = (DescriptorChain, response::CmdResponse); 53 54 impl Worker { new(cmd_queue: Queue, event_queue: Queue) -> Self55 pub fn new(cmd_queue: Queue, event_queue: Queue) -> Self { 56 Self { 57 cmd_queue, 58 event_queue, 59 desc_map: Default::default(), 60 } 61 } 62 63 /// Writes responses into the command queue. write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()>64 fn write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()> { 65 if responses.is_empty() { 66 return Ok(()); 67 } 68 while let Some((mut desc, response)) = responses.pop_front() { 69 if let Err(e) = response.write(&mut desc.writer) { 70 error!( 71 "failed to write a command response for {:?}: {}", 72 response, e 73 ); 74 } 75 let len = desc.writer.bytes_written() as u32; 76 self.cmd_queue.add_used(desc, len); 77 } 78 self.cmd_queue.trigger_interrupt(); 79 Ok(()) 80 } 81 82 /// Writes a `VideoEvt` into the event queue. write_event(&mut self, event: event::VideoEvt) -> Result<()>83 fn write_event(&mut self, event: event::VideoEvt) -> Result<()> { 84 let mut desc = self 85 .event_queue 86 .pop() 87 .ok_or(Error::DescriptorNotAvailable)?; 88 89 event 90 .write(&mut desc.writer) 91 .map_err(|error| Error::WriteEventFailure { event, error })?; 92 let len = desc.writer.bytes_written() as u32; 93 self.event_queue.add_used(desc, len); 94 self.event_queue.trigger_interrupt(); 95 Ok(()) 96 } 97 98 /// Writes the `event_responses` into the command queue or the event queue according to 99 /// each response's type. 100 /// 101 /// # Arguments 102 /// 103 /// * `event_responses` - Responses to write 104 /// * `stream_id` - Stream session ID of the responses write_event_responses( &mut self, event_responses: Vec<VideoEvtResponseType>, stream_id: u32, ) -> Result<()>105 fn write_event_responses( 106 &mut self, 107 event_responses: Vec<VideoEvtResponseType>, 108 stream_id: u32, 109 ) -> Result<()> { 110 let mut responses: VecDeque<WritableResp> = Default::default(); 111 for event_response in event_responses { 112 match event_response { 113 VideoEvtResponseType::AsyncCmd(async_response) => { 114 let AsyncCmdResponse { 115 tag, 116 response: cmd_result, 117 } = async_response; 118 match self.desc_map.remove(&tag) { 119 Some(desc) => { 120 let cmd_response = match cmd_result { 121 Ok(r) => r, 122 Err(e) => { 123 error!("returning async error response: {}", &e); 124 e.into() 125 } 126 }; 127 responses.push_back((desc, cmd_response)) 128 } 129 None => match tag { 130 // TODO(b/153406792): Drain is cancelled by clearing either of the 131 // stream's queues. To work around a limitation in the VDA api, the 132 // output queue is cleared synchronously without going through VDA. 133 // Because of this, the cancellation response from VDA for the 134 // input queue might fail to find the drain's AsyncCmdTag. 135 AsyncCmdTag::Drain { stream_id: _ } => { 136 info!("ignoring unknown drain response"); 137 } 138 _ => { 139 error!("dropping response for an untracked command: {:?}", tag); 140 } 141 }, 142 } 143 } 144 VideoEvtResponseType::Event(evt) => { 145 self.write_event(evt)?; 146 } 147 } 148 } 149 150 if let Err(e) = self.write_responses(&mut responses) { 151 error!("Failed to write event responses: {:?}", e); 152 // Ignore result of write_event for a fatal error. 153 let _ = self.write_event(VideoEvt { 154 typ: EvtType::Error, 155 stream_id, 156 }); 157 return Err(e); 158 } 159 160 Ok(()) 161 } 162 163 /// Handles a `DescriptorChain` value sent via the command queue and returns a `VecDeque` 164 /// of `WritableResp` to be sent to the guest. 165 /// 166 /// # Arguments 167 /// 168 /// * `device` - Instance of backend device 169 /// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to 170 /// `wait_ctx` 171 /// * `desc` - `DescriptorChain` to handle handle_command_desc( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, mut desc: DescriptorChain, ) -> Result<VecDeque<WritableResp>>172 fn handle_command_desc( 173 &mut self, 174 device: &mut dyn Device, 175 wait_ctx: &WaitContext<Token>, 176 mut desc: DescriptorChain, 177 ) -> Result<VecDeque<WritableResp>> { 178 let mut responses: VecDeque<WritableResp> = Default::default(); 179 let cmd = VideoCmd::from_reader(&mut desc.reader).map_err(Error::ReadFailure)?; 180 181 // If a destruction command comes, cancel pending requests. 182 // TODO(b/161774071): Allow `process_cmd` to return multiple responses and move this 183 // into encoder/decoder. 184 let async_responses = match cmd { 185 VideoCmd::ResourceDestroyAll { 186 stream_id, 187 queue_type, 188 } => self 189 .desc_map 190 .create_cancellation_responses(&stream_id, Some(queue_type), None), 191 VideoCmd::StreamDestroy { stream_id } => self 192 .desc_map 193 .create_cancellation_responses(&stream_id, None, None), 194 VideoCmd::QueueClear { 195 stream_id, 196 queue_type: QueueType::Output, 197 } => { 198 // TODO(b/153406792): Due to a workaround for a limitation in the VDA api, 199 // clearing the output queue doesn't go through the same Async path as clearing 200 // the input queue. However, we still need to cancel the pending resources. 201 self.desc_map.create_cancellation_responses( 202 &stream_id, 203 Some(QueueType::Output), 204 None, 205 ) 206 } 207 _ => Default::default(), 208 }; 209 for async_response in async_responses { 210 let AsyncCmdResponse { 211 tag, 212 response: cmd_result, 213 } = async_response; 214 let destroy_response = match cmd_result { 215 Ok(r) => r, 216 Err(e) => { 217 error!("returning async error response: {}", &e); 218 e.into() 219 } 220 }; 221 match self.desc_map.remove(&tag) { 222 Some(destroy_desc) => { 223 responses.push_back((destroy_desc, destroy_response)); 224 } 225 None => error!("dropping response for an untracked command: {:?}", tag), 226 } 227 } 228 229 // Process the command by the device. 230 let (cmd_response, event_responses_with_id) = device.process_cmd(cmd, wait_ctx); 231 match cmd_response { 232 VideoCmdResponseType::Sync(r) => { 233 responses.push_back((desc, r)); 234 } 235 VideoCmdResponseType::Async(tag) => { 236 // If the command expects an asynchronous response, 237 // store `desc` to use it after the back-end device notifies the 238 // completion. 239 self.desc_map.insert(tag, desc); 240 } 241 } 242 if let Some((stream_id, event_responses)) = event_responses_with_id { 243 self.write_event_responses(event_responses, stream_id)?; 244 } 245 246 Ok(responses) 247 } 248 249 /// Handles each command in the command queue. 250 /// 251 /// # Arguments 252 /// 253 /// * `device` - Instance of backend device 254 /// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to 255 /// `wait_ctx` handle_command_queue( &mut self, device: &mut dyn Device, wait_ctx: &WaitContext<Token>, ) -> Result<()>256 fn handle_command_queue( 257 &mut self, 258 device: &mut dyn Device, 259 wait_ctx: &WaitContext<Token>, 260 ) -> Result<()> { 261 while let Some(desc) = self.cmd_queue.pop() { 262 let mut resps = self.handle_command_desc(device, wait_ctx, desc)?; 263 self.write_responses(&mut resps)?; 264 } 265 Ok(()) 266 } 267 268 /// Handles an event notified via an event. 269 /// 270 /// # Arguments 271 /// 272 /// * `device` - Instance of backend device 273 /// * `stream_id` - Stream session ID of the event 274 /// * `wait_ctx` - `device` may register a new `Token::Buffer` for a new stream session to 275 /// `wait_ctx` handle_event( &mut self, device: &mut dyn Device, stream_id: u32, wait_ctx: &WaitContext<Token>, ) -> Result<()>276 fn handle_event( 277 &mut self, 278 device: &mut dyn Device, 279 stream_id: u32, 280 wait_ctx: &WaitContext<Token>, 281 ) -> Result<()> { 282 if let Some(event_responses) = device.process_event(&mut self.desc_map, stream_id, wait_ctx) 283 { 284 self.write_event_responses(event_responses, stream_id)?; 285 } 286 Ok(()) 287 } 288 289 /// Handles a completed buffer barrier. 290 /// 291 /// # Arguments 292 /// 293 /// * `device` - Instance of backend device 294 /// * `stream_id` - Stream session ID of the event 295 /// * `wait_ctx` - `device` may deregister the completed `Token::BufferBarrier` from `wait_ctx`. handle_buffer_barrier( &mut self, device: &mut dyn Device, stream_id: u32, wait_ctx: &WaitContext<Token>, ) -> Result<()>296 fn handle_buffer_barrier( 297 &mut self, 298 device: &mut dyn Device, 299 stream_id: u32, 300 wait_ctx: &WaitContext<Token>, 301 ) -> Result<()> { 302 if let Some(event_responses) = device.process_buffer_barrier(stream_id, wait_ctx) { 303 self.write_event_responses(event_responses, stream_id)?; 304 } 305 Ok(()) 306 } 307 308 /// Runs the video device virtio queues in a blocking way. 309 /// 310 /// # Arguments 311 /// 312 /// * `device` - Instance of backend device 313 /// * `kill_evt` - `Event` notified to make `run` stop and return run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()>314 pub fn run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()> { 315 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[ 316 (self.cmd_queue.event(), Token::CmdQueue), 317 (self.event_queue.event(), Token::EventQueue), 318 (kill_evt, Token::Kill), 319 ]) 320 .and_then(|wc| { 321 // resampling event exists per-PCI-INTx basis, so the two queues have the same event. 322 // Thus, checking only cmd_queue_interrupt suffices. 323 if let Some(resample_evt) = self.cmd_queue.interrupt().get_resample_evt() { 324 wc.add(resample_evt, Token::InterruptResample)?; 325 } 326 Ok(wc) 327 }) 328 .map_err(Error::WaitContextCreationFailed)?; 329 330 loop { 331 let wait_events = wait_ctx.wait().map_err(Error::WaitError)?; 332 333 for wait_event in wait_events.iter().filter(|e| e.is_readable) { 334 match wait_event.token { 335 Token::CmdQueue => { 336 let _ = self.cmd_queue.event().wait(); 337 self.handle_command_queue(device.as_mut(), &wait_ctx)?; 338 } 339 Token::EventQueue => { 340 let _ = self.event_queue.event().wait(); 341 } 342 Token::Event { id } => { 343 self.handle_event(device.as_mut(), id, &wait_ctx)?; 344 } 345 Token::BufferBarrier { id } => { 346 self.handle_buffer_barrier(device.as_mut(), id, &wait_ctx)?; 347 } 348 Token::InterruptResample => { 349 // Clear the event. `expect` is ok since the token fires if and only if 350 // resample exists. resampling event exists per-PCI-INTx basis, so the 351 // two queues have the same event. 352 let _ = self 353 .cmd_queue 354 .interrupt() 355 .get_resample_evt() 356 .expect("resample event for the command queue doesn't exist") 357 .wait(); 358 self.cmd_queue.interrupt().do_interrupt_resample(); 359 } 360 Token::Kill => return Ok(()), 361 } 362 } 363 } 364 } 365 366 /// Runs the video device virtio queues asynchronously. 367 /// 368 /// # Arguments 369 /// 370 /// * `device` - Instance of backend device 371 /// * `ex` - Instance of `Executor` of asynchronous operations 372 /// * `cmd_evt` - Driver-to-device kick event for the command queue 373 /// * `event_evt` - Driver-to-device kick event for the event queue 374 #[allow(dead_code)] run_async( mut self, mut device: Box<dyn Device>, ex: Executor, cmd_evt: Event, event_evt: Event, ) -> Result<()>375 pub async fn run_async( 376 mut self, 377 mut device: Box<dyn Device>, 378 ex: Executor, 379 cmd_evt: Event, 380 event_evt: Event, 381 ) -> Result<()> { 382 let cmd_queue_evt = 383 EventAsync::new(cmd_evt, &ex).map_err(Error::EventAsyncCreationFailed)?; 384 let event_queue_evt = 385 EventAsync::new(event_evt, &ex).map_err(Error::EventAsyncCreationFailed)?; 386 387 // WaitContext to wait for the response from the encoder/decoder device. 388 let device_wait_ctx = WaitContext::new().map_err(Error::WaitContextCreationFailed)?; 389 let device_evt = ex 390 .async_from(AsyncWrapper::new( 391 clone_descriptor(&device_wait_ctx).map_err(Error::CloneDescriptorFailed)?, 392 )) 393 .map_err(Error::EventAsyncCreationFailed)?; 394 395 loop { 396 let ( 397 cmd_queue_evt, 398 device_evt, 399 // Ignore driver-to-device kicks since the event queue is write-only for a device. 400 _event_queue_evt, 401 ) = select3( 402 cmd_queue_evt.next_val().boxed_local(), 403 device_evt.wait_readable().boxed_local(), 404 event_queue_evt.next_val().boxed_local(), 405 ) 406 .await; 407 408 if let SelectResult::Finished(_) = cmd_queue_evt { 409 self.handle_command_queue(device.as_mut(), &device_wait_ctx)?; 410 } 411 412 if let SelectResult::Finished(_) = device_evt { 413 let device_events = match device_wait_ctx.wait_timeout(Duration::from_secs(0)) { 414 Ok(device_events) => device_events, 415 Err(_) => { 416 error!("failed to read a device event"); 417 continue; 418 } 419 }; 420 for device_event in device_events { 421 // A Device must trigger only Token::Event. See [`Device::process_cmd()`]. 422 if let Token::Event { id } = device_event.token { 423 self.handle_event(device.as_mut(), id, &device_wait_ctx)?; 424 } else { 425 error!( 426 "invalid event is triggered by a device {:?}", 427 device_event.token 428 ); 429 } 430 } 431 } 432 } 433 } 434 } 435