xref: /aosp_15_r20/external/crosvm/src/crosvm/sys/linux/pci_hotplug_manager.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2023 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! A high-level manager for hotplug PCI devices.
6 
7 // TODO(b/243767476): Support aarch64.
8 use std::cmp::Ordering;
9 use std::collections::BTreeMap;
10 use std::collections::HashMap;
11 use std::collections::VecDeque;
12 use std::sync::mpsc;
13 use std::sync::Arc;
14 
15 use anyhow::anyhow;
16 use anyhow::bail;
17 use anyhow::Context;
18 use anyhow::Error;
19 use arch::RunnableLinuxVm;
20 use arch::VcpuArch;
21 use arch::VmArch;
22 use base::AsRawDescriptor;
23 use base::Event;
24 use base::EventToken;
25 use base::RawDescriptor;
26 use base::WaitContext;
27 use base::WorkerThread;
28 use devices::BusDevice;
29 use devices::HotPlugBus;
30 use devices::HotPlugKey;
31 use devices::IrqEventSource;
32 use devices::IrqLevelEvent;
33 use devices::PciAddress;
34 use devices::PciInterruptPin;
35 use devices::PciRootCommand;
36 use devices::ResourceCarrier;
37 use log::error;
38 use resources::SystemAllocator;
39 #[cfg(feature = "swap")]
40 use swap::SwapDeviceHelper;
41 use sync::Mutex;
42 use vm_memory::GuestMemory;
43 
44 use crate::crosvm::sys::linux::JailWarden;
45 use crate::crosvm::sys::linux::JailWardenImpl;
46 use crate::crosvm::sys::linux::PermissiveJailWarden;
47 use crate::Config;
48 
49 pub type Result<T> = std::result::Result<T, Error>;
50 
51 /// PciHotPlugManager manages hotplug ports, and handles PCI device hot plug and hot removal.
52 pub struct PciHotPlugManager {
53     /// map of ports managed
54     port_stubs: BTreeMap<PciAddress, PortManagerStub>,
55     /// map of downstream bus to upstream PCI address
56     bus_address_map: BTreeMap<u8, PciAddress>,
57     /// JailWarden for jailing hotplug devices
58     jail_warden: Box<dyn JailWarden>,
59     /// Client on Manager side of PciHotPlugWorker
60     worker_client: Option<WorkerClient>,
61 }
62 
63 /// WorkerClient is a wrapper of the worker methods.
64 struct WorkerClient {
65     /// event to signal control command is sent
66     control_evt: Event,
67     /// control channel to worker
68     command_sender: mpsc::Sender<WorkerCommand>,
69     /// response channel from worker
70     response_receiver: mpsc::Receiver<WorkerResponse>,
71     _worker_thread: WorkerThread<Result<()>>,
72 }
73 
74 impl WorkerClient {
75     /// Constructs PciHotPlugWorker with its client.
new(rootbus_controller: mpsc::Sender<PciRootCommand>) -> Result<Self>76     fn new(rootbus_controller: mpsc::Sender<PciRootCommand>) -> Result<Self> {
77         let (command_sender, command_receiver) = mpsc::channel();
78         let (response_sender, response_receiver) = mpsc::channel();
79         let control_evt = Event::new()?;
80         let control_evt_cpy = control_evt.try_clone()?;
81         let worker_thread = WorkerThread::start("pcihp_mgr_workr", move |kill_evt| {
82             let mut worker = PciHotPlugWorker::new(
83                 rootbus_controller,
84                 command_receiver,
85                 response_sender,
86                 control_evt_cpy,
87                 &kill_evt,
88             )?;
89             worker.run(kill_evt).inspect_err(|e| {
90                 error!("Worker exited with error: {:?}", e);
91             })
92         });
93         Ok(WorkerClient {
94             control_evt,
95             command_sender,
96             response_receiver,
97             _worker_thread: worker_thread,
98         })
99     }
100 
101     /// Sends worker command, and wait for its response.
send_worker_command(&self, command: WorkerCommand) -> Result<WorkerResponse>102     fn send_worker_command(&self, command: WorkerCommand) -> Result<WorkerResponse> {
103         self.command_sender.send(command)?;
104         self.control_evt.signal()?;
105         Ok(self.response_receiver.recv()?)
106     }
107 }
108 
109 /// PortManagerStub is the manager-side copy of a port.
110 struct PortManagerStub {
111     /// index of downstream bus
112     downstream_bus: u8,
113     /// Map of hotplugged devices, and system resources that can be released when device is
114     /// removed.
115     devices: HashMap<PciAddress, RecoverableResource>,
116 }
117 
118 /// System resources that can be released when a hotplugged device is removed.
119 struct RecoverableResource {
120     irq_num: u32,
121     irq_evt: IrqLevelEvent,
122 }
123 
124 /// Control commands to worker.
125 enum WorkerCommand {
126     /// Add port to the worker.
127     AddPort(PciAddress, PortWorkerStub),
128     /// Get the state of the port.
129     GetPortState(PciAddress),
130     /// Get an empty port for hotplug. Returns the least port sorted by PortKey.
131     GetEmptyPort,
132     /// Signals hot plug on port. Changes an empty port to occupied.
133     SignalHotPlug(SignalHotPlugCommand),
134     /// Signals hot unplug on port. Changes an occupied port to empty.
135     SignalHotUnplug(PciAddress),
136 }
137 
138 #[derive(Clone)]
139 struct GuestDeviceStub {
140     pci_addr: PciAddress,
141     key: HotPlugKey,
142     device: Arc<Mutex<dyn BusDevice>>,
143 }
144 
145 #[derive(Clone)]
146 struct SignalHotPlugCommand {
147     /// the upstream address of hotplug port
148     upstream_address: PciAddress,
149     /// the array of guest devices on the port
150     guest_devices: Vec<GuestDeviceStub>,
151 }
152 
153 impl SignalHotPlugCommand {
new(upstream_address: PciAddress, guest_devices: Vec<GuestDeviceStub>) -> Result<Self>154     fn new(upstream_address: PciAddress, guest_devices: Vec<GuestDeviceStub>) -> Result<Self> {
155         if guest_devices.is_empty() {
156             bail!("No guest devices");
157         }
158         Ok(Self {
159             upstream_address,
160             guest_devices,
161         })
162     }
163 }
164 
165 /// PortWorkerStub is the worker-side copy of a port.
166 #[derive(Clone)]
167 struct PortWorkerStub {
168     /// The downstream base address of the port. Needed to send plug and unplug signal.
169     base_address: PciAddress,
170     /// Currently attached devices that should be removed.
171     attached_devices: Vec<PciAddress>,
172     /// Devices to be added each time send_hot_plug_signal is called.
173     devices_to_add: VecDeque<Vec<GuestDeviceStub>>,
174     /// hotplug port
175     port: Arc<Mutex<dyn HotPlugBus>>,
176 }
177 
178 impl PortWorkerStub {
new(port: Arc<Mutex<dyn HotPlugBus>>, downstream_bus: u8) -> Result<Self>179     fn new(port: Arc<Mutex<dyn HotPlugBus>>, downstream_bus: u8) -> Result<Self> {
180         let base_address = PciAddress::new(0, downstream_bus.into(), 0, 0)?;
181         Ok(Self {
182             base_address,
183             devices_to_add: VecDeque::new(),
184             attached_devices: Vec::new(),
185             port,
186         })
187     }
188 
add_hotplug_devices(&mut self, devices: Vec<GuestDeviceStub>) -> Result<()>189     fn add_hotplug_devices(&mut self, devices: Vec<GuestDeviceStub>) -> Result<()> {
190         if devices.is_empty() {
191             bail!("No guest devices");
192         }
193         self.devices_to_add.push_back(devices);
194         Ok(())
195     }
196 
cancel_queued_add(&mut self) -> Result<()>197     fn cancel_queued_add(&mut self) -> Result<()> {
198         self.devices_to_add
199             .pop_back()
200             .context("No guest device add queued")?;
201         Ok(())
202     }
203 
send_hot_plug_signal( &mut self, rootbus_controller: &mpsc::Sender<PciRootCommand>, ) -> Result<Event>204     fn send_hot_plug_signal(
205         &mut self,
206         rootbus_controller: &mpsc::Sender<PciRootCommand>,
207     ) -> Result<Event> {
208         let mut port_lock = self.port.lock();
209         let devices = self
210             .devices_to_add
211             .pop_front()
212             .context("Missing devices to add")?;
213         for device in devices {
214             rootbus_controller.send(PciRootCommand::Add(device.pci_addr, device.device))?;
215             self.attached_devices.push(device.pci_addr);
216             port_lock.add_hotplug_device(device.key, device.pci_addr);
217         }
218         port_lock
219             .hot_plug(self.base_address)?
220             .context("hotplug bus does not support command complete notification")
221     }
222 
send_hot_unplug_signal( &mut self, rootbus_controller: &mpsc::Sender<PciRootCommand>, ) -> Result<Event>223     fn send_hot_unplug_signal(
224         &mut self,
225         rootbus_controller: &mpsc::Sender<PciRootCommand>,
226     ) -> Result<Event> {
227         for pci_addr in self.attached_devices.drain(..) {
228             rootbus_controller.send(PciRootCommand::Remove(pci_addr))?;
229         }
230         self.port
231             .lock()
232             .hot_unplug(self.base_address)?
233             .context("hotplug bus does not support command complete notification")
234     }
235 }
236 
237 /// Control response from worker.
238 #[derive(Debug)]
239 enum WorkerResponse {
240     /// AddPort success.
241     AddPortOk,
242     /// GetEmptyPort success, use port at PciAddress.
243     GetEmptyPortOk(PciAddress),
244     /// GetPortState success. The "steps behind" field shall be considered expired, and the guest
245     /// is "less than or equal to" n steps behind.
246     GetPortStateOk(PortState),
247     /// SignalHotPlug or SignalHotUnplug success.
248     SignalOk,
249     /// Command fail because it is not valid.
250     InvalidCommand(Error),
251 }
252 
253 impl PartialEq for WorkerResponse {
eq(&self, other: &Self) -> bool254     fn eq(&self, other: &Self) -> bool {
255         match (self, other) {
256             (Self::GetEmptyPortOk(l0), Self::GetEmptyPortOk(r0)) => l0 == r0,
257             (Self::GetPortStateOk(l0), Self::GetPortStateOk(r0)) => l0 == r0,
258             (Self::InvalidCommand(_), Self::InvalidCommand(_)) => true,
259             _ => core::mem::discriminant(self) == core::mem::discriminant(other),
260         }
261     }
262 }
263 
264 #[derive(Debug, EventToken)]
265 enum Token {
266     Kill,
267     ManagerCommand,
268     PortReady(RawDescriptor),
269     PlugComplete(RawDescriptor),
270     UnplugComplete(RawDescriptor),
271 }
272 
273 /// PciHotPlugWorker is a worker that handles the asynchrony of slot states between crosvm and the
274 /// guest OS. It is responsible for scheduling the PCIe slot control signals and handle its result.
275 struct PciHotPlugWorker {
276     event_map: BTreeMap<RawDescriptor, (Event, PciAddress)>,
277     port_state_map: BTreeMap<PciAddress, PortState>,
278     port_map: BTreeMap<PortKey, PortWorkerStub>,
279     manager_evt: Event,
280     wait_ctx: WaitContext<Token>,
281     command_receiver: mpsc::Receiver<WorkerCommand>,
282     response_sender: mpsc::Sender<WorkerResponse>,
283     rootbus_controller: mpsc::Sender<PciRootCommand>,
284 }
285 
286 impl PciHotPlugWorker {
new( rootbus_controller: mpsc::Sender<PciRootCommand>, command_receiver: mpsc::Receiver<WorkerCommand>, response_sender: mpsc::Sender<WorkerResponse>, manager_evt: Event, kill_evt: &Event, ) -> Result<Self>287     fn new(
288         rootbus_controller: mpsc::Sender<PciRootCommand>,
289         command_receiver: mpsc::Receiver<WorkerCommand>,
290         response_sender: mpsc::Sender<WorkerResponse>,
291         manager_evt: Event,
292         kill_evt: &Event,
293     ) -> Result<Self> {
294         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
295             (&manager_evt, Token::ManagerCommand),
296             (kill_evt, Token::Kill),
297         ])?;
298         Ok(Self {
299             event_map: BTreeMap::new(),
300             port_state_map: BTreeMap::new(),
301             port_map: BTreeMap::new(),
302             manager_evt,
303             wait_ctx,
304             command_receiver,
305             response_sender,
306             rootbus_controller,
307         })
308     }
309 
310     /// Starts the worker. Runs until received kill request, or an error that the worker is in an
311     /// invalid state.
run(&mut self, kill_evt: Event) -> Result<()>312     fn run(&mut self, kill_evt: Event) -> Result<()> {
313         'wait: loop {
314             let events = self.wait_ctx.wait()?;
315             for triggered_event in events.iter().filter(|e| e.is_readable) {
316                 match triggered_event.token {
317                     Token::ManagerCommand => {
318                         self.manager_evt.wait()?;
319                         self.handle_manager_command()?;
320                     }
321                     Token::PortReady(descriptor) => {
322                         let (event, pci_address) = self
323                             .event_map
324                             .remove(&descriptor)
325                             .context("Cannot find event")?;
326                         event.wait()?;
327                         self.wait_ctx.delete(&event)?;
328                         self.handle_port_ready(pci_address)?;
329                     }
330                     Token::PlugComplete(descriptor) => {
331                         let (event, pci_address) = self
332                             .event_map
333                             .remove(&descriptor)
334                             .context("Cannot find event")?;
335                         event.wait()?;
336                         self.wait_ctx.delete(&event)?;
337                         self.handle_plug_complete(pci_address)?;
338                     }
339                     Token::UnplugComplete(descriptor) => {
340                         let (event, pci_address) = self
341                             .event_map
342                             .remove(&descriptor)
343                             .context("Cannot find event")?;
344                         self.wait_ctx.delete(&event)?;
345                         self.handle_unplug_complete(pci_address)?;
346                     }
347                     Token::Kill => {
348                         let _ = kill_evt.wait();
349                         break 'wait;
350                     }
351                 }
352             }
353         }
354         Ok(())
355     }
356 
handle_manager_command(&mut self) -> Result<()>357     fn handle_manager_command(&mut self) -> Result<()> {
358         let response = match self.command_receiver.recv()? {
359             WorkerCommand::AddPort(pci_address, port) => self.handle_add_port(pci_address, port),
360             WorkerCommand::GetPortState(pci_address) => self.handle_get_port_state(pci_address),
361             WorkerCommand::GetEmptyPort => self.handle_get_empty_port(),
362             WorkerCommand::SignalHotPlug(hotplug_command) => {
363                 self.handle_plug_request(hotplug_command)
364             }
365             WorkerCommand::SignalHotUnplug(pci_address) => self.handle_unplug_request(pci_address),
366         }?;
367         Ok(self.response_sender.send(response)?)
368     }
369 
370     /// Handles add port: Initiate port in EmptyNotReady state.
handle_add_port( &mut self, pci_address: PciAddress, port: PortWorkerStub, ) -> Result<WorkerResponse>371     fn handle_add_port(
372         &mut self,
373         pci_address: PciAddress,
374         port: PortWorkerStub,
375     ) -> Result<WorkerResponse> {
376         if self.port_state_map.contains_key(&pci_address) {
377             return Ok(WorkerResponse::InvalidCommand(anyhow!(
378                 "Conflicting upstream PCI address"
379             )));
380         }
381         let port_state = PortState::EmptyNotReady;
382         let port_ready_event = port.port.lock().get_ready_notification()?;
383         self.wait_ctx.add(
384             &port_ready_event,
385             Token::PortReady(port_ready_event.as_raw_descriptor()),
386         )?;
387         self.event_map.insert(
388             port_ready_event.as_raw_descriptor(),
389             (port_ready_event, pci_address),
390         );
391         self.port_state_map.insert(pci_address, port_state);
392         self.port_map.insert(
393             PortKey {
394                 port_state,
395                 pci_address,
396             },
397             port,
398         );
399         Ok(WorkerResponse::AddPortOk)
400     }
401 
402     /// Handles get port state: returns the PortState.
handle_get_port_state(&self, pci_address: PciAddress) -> Result<WorkerResponse>403     fn handle_get_port_state(&self, pci_address: PciAddress) -> Result<WorkerResponse> {
404         match self.get_port_state(pci_address) {
405             Ok(ps) => Ok(WorkerResponse::GetPortStateOk(ps)),
406             Err(e) => Ok(WorkerResponse::InvalidCommand(e)),
407         }
408     }
409 
410     /// Handle getting empty port: Find the most empty port, or return error if all are occupied.
handle_get_empty_port(&self) -> Result<WorkerResponse>411     fn handle_get_empty_port(&self) -> Result<WorkerResponse> {
412         let most_empty_port = match self.port_map.first_key_value() {
413             Some(p) => p.0,
414             None => return Ok(WorkerResponse::InvalidCommand(anyhow!("No ports added"))),
415         };
416         match most_empty_port.port_state {
417             PortState::Empty(_) | PortState::EmptyNotReady => {
418                 Ok(WorkerResponse::GetEmptyPortOk(most_empty_port.pci_address))
419             }
420             PortState::Occupied(_) | PortState::OccupiedNotReady => {
421                 Ok(WorkerResponse::InvalidCommand(anyhow!("No empty port")))
422             }
423         }
424     }
425 
426     /// Handles plug request: Moves PortState from EmptyNotReady to OccupiedNotReady, Empty(n) to
427     /// Occupied(n+1), and schedules the next plug event if n == 0.
handle_plug_request( &mut self, hotplug_command: SignalHotPlugCommand, ) -> Result<WorkerResponse>428     fn handle_plug_request(
429         &mut self,
430         hotplug_command: SignalHotPlugCommand,
431     ) -> Result<WorkerResponse> {
432         let pci_address = hotplug_command.upstream_address;
433         let next_state = match self.get_port_state(pci_address) {
434             Ok(PortState::Empty(n)) => {
435                 self.get_port_mut(pci_address)?
436                     .add_hotplug_devices(hotplug_command.guest_devices)?;
437                 if n == 0 {
438                     self.schedule_plug_event(pci_address)?;
439                 }
440                 PortState::Occupied(n + 1)
441             }
442             Ok(PortState::EmptyNotReady) => {
443                 self.get_port_mut(pci_address)?
444                     .add_hotplug_devices(hotplug_command.guest_devices)?;
445                 PortState::OccupiedNotReady
446             }
447             Ok(PortState::Occupied(_)) | Ok(PortState::OccupiedNotReady) => {
448                 return Ok(WorkerResponse::InvalidCommand(anyhow!(
449                     "Attempt to plug into an occupied port"
450                 )))
451             }
452             Err(e) => return Ok(WorkerResponse::InvalidCommand(e)),
453         };
454         self.set_port_state(pci_address, next_state)?;
455         Ok(WorkerResponse::SignalOk)
456     }
457 
458     /// Handles unplug request: Moves PortState from OccupiedNotReady to EmptyNotReady, Occupied(n)
459     /// to Empty(n % 2 + 1), and schedules the next unplug event if n == 0.
460     ///
461     /// n % 2 + 1: When unplug request is made, it either schedule the unplug event
462     /// (n == 0 => 1 or n == 1 => 2), or cancels the corresponding plug event that has not started
463     /// (n == 2 => 1 or n == 3 => 2). Staring at the mapping, it maps n to either 1 or 2 of opposite
464     /// oddity. n % 2 + 1 is a good shorthand instead of the individual mappings.
handle_unplug_request(&mut self, pci_address: PciAddress) -> Result<WorkerResponse>465     fn handle_unplug_request(&mut self, pci_address: PciAddress) -> Result<WorkerResponse> {
466         let next_state = match self.get_port_state(pci_address) {
467             Ok(PortState::Occupied(n)) => {
468                 if n >= 2 {
469                     self.get_port_mut(pci_address)?.cancel_queued_add()?;
470                 }
471                 if n == 0 {
472                     self.schedule_unplug_event(pci_address)?;
473                 }
474                 PortState::Empty(n % 2 + 1)
475             }
476             Ok(PortState::OccupiedNotReady) => PortState::EmptyNotReady,
477             Ok(PortState::Empty(_)) | Ok(PortState::EmptyNotReady) => {
478                 return Ok(WorkerResponse::InvalidCommand(anyhow!(
479                     "Attempt to unplug from an empty port"
480                 )))
481             }
482             Err(e) => return Ok(WorkerResponse::InvalidCommand(e)),
483         };
484         self.set_port_state(pci_address, next_state)?;
485         Ok(WorkerResponse::SignalOk)
486     }
487 
488     /// Handles port ready: Moves PortState from EmptyNotReady to Empty(0), OccupiedNotReady to
489     /// Occupied(1), and schedules the next event if port is occupied
handle_port_ready(&mut self, pci_address: PciAddress) -> Result<()>490     fn handle_port_ready(&mut self, pci_address: PciAddress) -> Result<()> {
491         let next_state = match self.get_port_state(pci_address)? {
492             PortState::EmptyNotReady => PortState::Empty(0),
493             PortState::OccupiedNotReady => {
494                 self.schedule_plug_event(pci_address)?;
495                 PortState::Occupied(1)
496             }
497             PortState::Empty(_) | PortState::Occupied(_) => {
498                 bail!("Received port ready on an already enabled port");
499             }
500         };
501         self.set_port_state(pci_address, next_state)
502     }
503 
504     /// Handles plug complete: Moves PortState from Any(n) to Any(n-1), and schedules the next
505     /// unplug event unless n == 1. (Any is either Empty or Occupied.)
handle_plug_complete(&mut self, pci_address: PciAddress) -> Result<()>506     fn handle_plug_complete(&mut self, pci_address: PciAddress) -> Result<()> {
507         let (n, next_state) = match self.get_port_state(pci_address)? {
508             // Note: n - 1 >= 0 as otherwise there would be no pending events.
509             PortState::Empty(n) => (n, PortState::Empty(n - 1)),
510             PortState::Occupied(n) => (n, PortState::Occupied(n - 1)),
511             PortState::EmptyNotReady | PortState::OccupiedNotReady => {
512                 bail!("Received plug completed on a not enabled port");
513             }
514         };
515         if n > 1 {
516             self.schedule_unplug_event(pci_address)?;
517         }
518         self.set_port_state(pci_address, next_state)
519     }
520 
521     /// Handles unplug complete: Moves PortState from Any(n) to Any(n-1), and schedules the next
522     /// plug event unless n == 1. (Any is either Empty or Occupied.)
handle_unplug_complete(&mut self, pci_address: PciAddress) -> Result<()>523     fn handle_unplug_complete(&mut self, pci_address: PciAddress) -> Result<()> {
524         let (n, next_state) = match self.get_port_state(pci_address)? {
525             // Note: n - 1 >= 0 as otherwise there would be no pending events.
526             PortState::Empty(n) => (n, PortState::Empty(n - 1)),
527             PortState::Occupied(n) => (n, PortState::Occupied(n - 1)),
528             PortState::EmptyNotReady | PortState::OccupiedNotReady => {
529                 bail!("Received unplug completed on a not enabled port");
530             }
531         };
532         if n > 1 {
533             self.schedule_plug_event(pci_address)?;
534         }
535         self.set_port_state(pci_address, next_state)
536     }
537 
get_port_state(&self, pci_address: PciAddress) -> Result<PortState>538     fn get_port_state(&self, pci_address: PciAddress) -> Result<PortState> {
539         Ok(*self
540             .port_state_map
541             .get(&pci_address)
542             .context(format!("Cannot find port state on {}", pci_address))?)
543     }
544 
set_port_state(&mut self, pci_address: PciAddress, port_state: PortState) -> Result<()>545     fn set_port_state(&mut self, pci_address: PciAddress, port_state: PortState) -> Result<()> {
546         let old_port_state = self.get_port_state(pci_address)?;
547         let port = self
548             .port_map
549             .remove(&PortKey {
550                 port_state: old_port_state,
551                 pci_address,
552             })
553             .context("Cannot find port")?;
554         self.port_map.insert(
555             PortKey {
556                 port_state,
557                 pci_address,
558             },
559             port,
560         );
561         self.port_state_map.insert(pci_address, port_state);
562         Ok(())
563     }
564 
schedule_plug_event(&mut self, pci_address: PciAddress) -> Result<()>565     fn schedule_plug_event(&mut self, pci_address: PciAddress) -> Result<()> {
566         let rootbus_controller = self.rootbus_controller.clone();
567         let plug_event = self
568             .get_port_mut(pci_address)?
569             .send_hot_plug_signal(&rootbus_controller)?;
570         self.wait_ctx.add(
571             &plug_event,
572             Token::PlugComplete(plug_event.as_raw_descriptor()),
573         )?;
574         self.event_map
575             .insert(plug_event.as_raw_descriptor(), (plug_event, pci_address));
576         Ok(())
577     }
578 
schedule_unplug_event(&mut self, pci_address: PciAddress) -> Result<()>579     fn schedule_unplug_event(&mut self, pci_address: PciAddress) -> Result<()> {
580         let rootbus_controller = self.rootbus_controller.clone();
581         let unplug_event = self
582             .get_port_mut(pci_address)?
583             .send_hot_unplug_signal(&rootbus_controller)?;
584         self.wait_ctx.add(
585             &unplug_event,
586             Token::UnplugComplete(unplug_event.as_raw_descriptor()),
587         )?;
588         self.event_map.insert(
589             unplug_event.as_raw_descriptor(),
590             (unplug_event, pci_address),
591         );
592         Ok(())
593     }
594 
get_port_mut(&mut self, pci_address: PciAddress) -> Result<&mut PortWorkerStub>595     fn get_port_mut(&mut self, pci_address: PciAddress) -> Result<&mut PortWorkerStub> {
596         let port_state = self.get_port_state(pci_address)?;
597         self.port_map
598             .get_mut(&PortKey {
599                 port_state,
600                 pci_address,
601             })
602             .context("PciHotPlugWorker is in invalid state")
603     }
604 }
605 
606 /// PortState indicates the state of the port.
607 ///
608 /// The initial PortState is EmptyNotReady (EmpNR). 9 PortStates are possible, and transition
609 /// between the states are only possible by the following 3 groups of functions:
610 /// handle_port_ready(R): guest notification of port ready to accept hot plug events.
611 /// handle_plug_request(P) and handle_unplug_request(U): host initated requests.
612 /// handle_plug_complete(PC) and handle_unplug_complete(UC): guest notification of event completion.
613 /// When a port is not ready, PC and UC are not expected as no events are scheduled.
614 /// The state transition is as follows:
615 ///    Emp0<-UC--Emp1<-PC--Emp2            |
616 ///  ^     \    ^    \^   ^    \^          |
617 /// /       P  /      P\ /      P\         |
618 /// |        \/        \\        \\        |
619 /// |        /\        /\\        \\       |
620 /// R       U  \      U  \U        \U      |
621 /// |      /    v    /    v\        v\     |
622 /// |  Occ0<-PC--Occ1<-UC--Occ2<-PC--Occ3  |
623 /// |              ^                       |
624 /// \              R                       |
625 ///   EmpNR<-P,U->OccNR                    |
626 
627 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
628 enum PortState {
629     /// Port is empty on crosvm. The state on the guest OS is n steps behind.
630     Empty(u8),
631     /// Port is empty on crosvm. The port is not enabled on the guest OS yet.
632     EmptyNotReady,
633     /// Port is occupied on crosvm. The state on the guest OS is n steps behind.
634     Occupied(u8),
635     /// Port is occupied on crosvm. The port is not enabled on the guest OS yet.
636     OccupiedNotReady,
637 }
638 
639 impl PortState {
variant_order_index(&self) -> u8640     fn variant_order_index(&self) -> u8 {
641         match self {
642             PortState::Empty(_) => 0,
643             PortState::EmptyNotReady => 1,
644             PortState::Occupied(_) => 2,
645             PortState::OccupiedNotReady => 3,
646         }
647     }
648 }
649 
650 /// Ordering on PortState defined by "most empty".
651 impl Ord for PortState {
cmp(&self, other: &Self) -> Ordering652     fn cmp(&self, other: &Self) -> Ordering {
653         // First compare by the variant: Empty < EmptyNotReady < Occupied < OccupiedNotReady.
654         match self.variant_order_index().cmp(&other.variant_order_index()) {
655             Ordering::Less => {
656                 return Ordering::Less;
657             }
658             Ordering::Equal => {}
659             Ordering::Greater => return Ordering::Greater,
660         }
661         // For the diagonals, prioritize ones with less step behind.
662         match (self, other) {
663             (PortState::Empty(lhs), PortState::Empty(rhs)) => lhs.cmp(rhs),
664             (PortState::Occupied(lhs), PortState::Occupied(rhs)) => lhs.cmp(rhs),
665             _ => Ordering::Equal,
666         }
667     }
668 }
669 
670 impl PartialOrd for PortState {
partial_cmp(&self, other: &Self) -> Option<Ordering>671     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
672         Some(self.cmp(other))
673     }
674 }
675 
676 /// PortKey is a unique identifier of ports with an ordering defined on it.
677 ///
678 /// Ports are ordered by whose downstream device would be discovered first by the guest OS.
679 /// Empty ports without pending events are ordered before those with pending events. When multiple
680 /// empty ports without pending events are available, they are ordered by PCI enumeration.
681 #[derive(PartialEq, Eq, PartialOrd, Ord)]
682 struct PortKey {
683     port_state: PortState,
684     pci_address: PciAddress,
685 }
686 
687 impl PciHotPlugManager {
688     /// Constructs PciHotPlugManager.
689     ///
690     /// Constructor uses forking, therefore has to be called early, before crosvm enters a
691     /// multi-threaded context.
new( guest_memory: GuestMemory, config: &Config, #[cfg(feature = "swap")] swap_device_helper: Option<SwapDeviceHelper>, ) -> Result<Self>692     pub fn new(
693         guest_memory: GuestMemory,
694         config: &Config,
695         #[cfg(feature = "swap")] swap_device_helper: Option<SwapDeviceHelper>,
696     ) -> Result<Self> {
697         let jail_warden: Box<dyn JailWarden> = match config.jail_config {
698             Some(_) => Box::new(
699                 JailWardenImpl::new(
700                     guest_memory,
701                     config,
702                     #[cfg(feature = "swap")]
703                     swap_device_helper,
704                 )
705                 .context("jail warden construction")?,
706             ),
707             None => Box::new(
708                 PermissiveJailWarden::new(
709                     guest_memory,
710                     config,
711                     #[cfg(feature = "swap")]
712                     swap_device_helper,
713                 )
714                 .context("jail warden construction")?,
715             ),
716         };
717         Ok(Self {
718             jail_warden,
719             port_stubs: BTreeMap::new(),
720             bus_address_map: BTreeMap::new(),
721             worker_client: None,
722         })
723     }
724 
725     /// Starts PciHotPlugManager. Required before any other commands.
726     ///
727     /// PciHotPlugManager::new must be called in a single-threaded context as it forks.
728     /// However, rootbus_controller is only available after VM boots when crosvm is multi-threaded.
729     ///
730     /// TODO(293801301): Remove unused after aarch64 support
731     #[allow(unused)]
set_rootbus_controller( &mut self, rootbus_controller: mpsc::Sender<PciRootCommand>, ) -> Result<()>732     pub fn set_rootbus_controller(
733         &mut self,
734         rootbus_controller: mpsc::Sender<PciRootCommand>,
735     ) -> Result<()> {
736         // Spins the PciHotPlugWorker.
737         self.worker_client = Some(WorkerClient::new(rootbus_controller)?);
738         Ok(())
739     }
740 
741     /// Adds a hotplug capable port to manage.
742     ///
743     /// PciHotPlugManager assumes exclusive control for adding and removing devices to this port.
744     /// TODO(293801301): Remove unused_variables after aarch64 support
745     #[allow(unused)]
add_port(&mut self, port: Arc<Mutex<dyn HotPlugBus>>) -> Result<()>746     pub fn add_port(&mut self, port: Arc<Mutex<dyn HotPlugBus>>) -> Result<()> {
747         let worker_client = self
748             .worker_client
749             .as_ref()
750             .context("No worker thread. Is set_rootbus_controller not called?")?;
751         let port_lock = port.lock();
752         // Rejects hotplug bus with downstream devices.
753         if !port_lock.is_empty() {
754             bail!("invalid hotplug bus");
755         }
756         let pci_address = port_lock
757             .get_address()
758             .context("Hotplug bus PCI address missing")?;
759         // Reject hotplug buses not on rootbus, since otherwise the order of enumeration depends on
760         // the topology of PCI.
761         if pci_address.bus != 0 {
762             bail!("hotplug port on non-root bus not supported");
763         }
764         let downstream_bus = port_lock
765             .get_secondary_bus_number()
766             .context("cannot get downstream bus")?;
767         drop(port_lock);
768         if let Some(prev_address) = self.bus_address_map.insert(downstream_bus, pci_address) {
769             bail!(
770                 "Downstream bus of new port is conflicting with previous port at {}",
771                 &prev_address
772             );
773         }
774         self.port_stubs.insert(
775             pci_address,
776             PortManagerStub {
777                 downstream_bus,
778                 devices: HashMap::new(),
779             },
780         );
781         match worker_client.send_worker_command(WorkerCommand::AddPort(
782             pci_address,
783             PortWorkerStub::new(port, downstream_bus)?,
784         ))? {
785             WorkerResponse::AddPortOk => Ok(()),
786             WorkerResponse::InvalidCommand(e) => Err(e),
787             r => bail!("Unexpected response from worker: {:?}", &r),
788         }
789     }
790 
791     /// hotplugs up to 8 PCI devices as "functions of a device" (in PCI Bus Device Function sense).
792     ///
793     /// returns the bus number of the bus on success.
hotplug_device<V: VmArch, Vcpu: VcpuArch>( &mut self, resource_carriers: Vec<ResourceCarrier>, linux: &mut RunnableLinuxVm<V, Vcpu>, resources: &mut SystemAllocator, ) -> Result<u8>794     pub fn hotplug_device<V: VmArch, Vcpu: VcpuArch>(
795         &mut self,
796         resource_carriers: Vec<ResourceCarrier>,
797         linux: &mut RunnableLinuxVm<V, Vcpu>,
798         resources: &mut SystemAllocator,
799     ) -> Result<u8> {
800         let worker_client = self
801             .worker_client
802             .as_ref()
803             .context("No worker thread. Is set_rootbus_controller not called?")?;
804         if resource_carriers.len() > 8 || resource_carriers.is_empty() {
805             bail!("PCI function count has to be 1 to 8 inclusive");
806         }
807         let pci_address = match worker_client.send_worker_command(WorkerCommand::GetEmptyPort)? {
808             WorkerResponse::GetEmptyPortOk(p) => Ok(p),
809             WorkerResponse::InvalidCommand(e) => Err(e),
810             r => bail!("Unexpected response from worker: {:?}", &r),
811         }?;
812         let port_stub = self
813             .port_stubs
814             .get_mut(&pci_address)
815             .context("Cannot find port")?;
816         let downstream_bus = port_stub.downstream_bus;
817         let mut devices = Vec::new();
818         for (func_num, mut resource_carrier) in resource_carriers.into_iter().enumerate() {
819             let device_address = PciAddress::new(0, downstream_bus as u32, 0, func_num as u32)?;
820             let hotplug_key = HotPlugKey::GuestDevice {
821                 guest_addr: device_address,
822             };
823             resource_carrier.allocate_address(device_address, resources)?;
824             let irq_evt = IrqLevelEvent::new()?;
825             let (pin, irq_num) = match downstream_bus % 4 {
826                 0 => (PciInterruptPin::IntA, 0),
827                 1 => (PciInterruptPin::IntB, 1),
828                 2 => (PciInterruptPin::IntC, 2),
829                 _ => (PciInterruptPin::IntD, 3),
830             };
831             resource_carrier.assign_irq(irq_evt.try_clone()?, pin, irq_num);
832             let (proxy_device, pid) = self
833                 .jail_warden
834                 .make_proxy_device(resource_carrier)
835                 .context("make proxy device")?;
836             let device_id = proxy_device.lock().device_id();
837             let device_name = proxy_device.lock().debug_label();
838             linux.irq_chip.as_irq_chip_mut().register_level_irq_event(
839                 irq_num,
840                 &irq_evt,
841                 IrqEventSource {
842                     device_id,
843                     queue_id: 0,
844                     device_name: device_name.clone(),
845                 },
846             )?;
847             let pid: u32 = pid.try_into().context("fork fail")?;
848             if pid > 0 {
849                 linux.pid_debug_label_map.insert(pid, device_name);
850             }
851             devices.push(GuestDeviceStub {
852                 pci_addr: device_address,
853                 key: hotplug_key,
854                 device: proxy_device,
855             });
856             port_stub
857                 .devices
858                 .insert(device_address, RecoverableResource { irq_num, irq_evt });
859         }
860         // Ask worker to schedule hotplug signal.
861         match worker_client.send_worker_command(WorkerCommand::SignalHotPlug(
862             SignalHotPlugCommand::new(pci_address, devices)?,
863         ))? {
864             WorkerResponse::SignalOk => Ok(downstream_bus),
865             WorkerResponse::InvalidCommand(e) => Err(e),
866             r => bail!("Unexpected response from worker: {:?}", &r),
867         }
868     }
869 
870     /// Removes all hotplugged devices on the hotplug bus.
remove_hotplug_device<V: VmArch, Vcpu: VcpuArch>( &mut self, bus: u8, linux: &mut RunnableLinuxVm<V, Vcpu>, resources: &mut SystemAllocator, ) -> Result<()>871     pub fn remove_hotplug_device<V: VmArch, Vcpu: VcpuArch>(
872         &mut self,
873         bus: u8,
874         linux: &mut RunnableLinuxVm<V, Vcpu>,
875         resources: &mut SystemAllocator,
876     ) -> Result<()> {
877         let worker_client = self
878             .worker_client
879             .as_ref()
880             .context("No worker thread. Is set_rootbus_controller not called?")?;
881         let pci_address = self
882             .bus_address_map
883             .get(&bus)
884             .context(format!("Port {} is not known", &bus))?;
885         match worker_client.send_worker_command(WorkerCommand::GetPortState(*pci_address))? {
886             WorkerResponse::GetPortStateOk(PortState::Occupied(_)) => {}
887             WorkerResponse::GetPortStateOk(PortState::Empty(_)) => {
888                 bail!("Port {} is empty", &bus)
889             }
890             WorkerResponse::InvalidCommand(e) => {
891                 return Err(e);
892             }
893             wr => bail!("Unexpected response from worker: {:?}", &wr),
894         };
895         // Performs a surprise removal. That is, not waiting for hot removal completion before
896         // deleting the resources.
897         match worker_client.send_worker_command(WorkerCommand::SignalHotUnplug(*pci_address))? {
898             WorkerResponse::SignalOk => {}
899             WorkerResponse::InvalidCommand(e) => {
900                 return Err(e);
901             }
902             wr => bail!("Unexpected response from worker: {:?}", &wr),
903         }
904         // Remove all devices on the hotplug bus.
905         let port_stub = self
906             .port_stubs
907             .get_mut(pci_address)
908             .context(format!("Port {} is not known", &bus))?;
909         for (downstream_address, recoverable_resource) in port_stub.devices.drain() {
910             // port_stub.port does not have remove_hotplug_device method, as devices are removed
911             // when hot_unplug is called.
912             resources.release_pci(
913                 downstream_address.bus,
914                 downstream_address.dev,
915                 downstream_address.func,
916             );
917             linux.irq_chip.unregister_level_irq_event(
918                 recoverable_resource.irq_num,
919                 &recoverable_resource.irq_evt,
920             )?;
921         }
922         Ok(())
923     }
924 }
925 
926 #[cfg(test)]
927 mod tests {
928     use std::thread;
929     use std::time::Duration;
930 
931     use devices::DeviceId;
932     use devices::Suspendable;
933     use serde::Deserialize;
934     use serde::Serialize;
935 
936     use super::*;
937 
938     /// A MockPort that only supports hot_plug and hot_unplug commands, and signaling command
939     /// complete manually, which is sufficient for PciHotPlugWorker unit test.
940     struct MockPort {
941         cc_event: Event,
942         downstream_bus: u8,
943         ready_events: Vec<Event>,
944     }
945 
946     impl MockPort {
new(downstream_bus: u8) -> Self947         fn new(downstream_bus: u8) -> Self {
948             Self {
949                 cc_event: Event::new().unwrap(),
950                 downstream_bus,
951                 ready_events: Vec::new(),
952             }
953         }
954 
signal_cc(&self)955         fn signal_cc(&self) {
956             self.cc_event.reset().unwrap();
957             self.cc_event.signal().unwrap();
958         }
959 
signal_ready(&mut self)960         fn signal_ready(&mut self) {
961             for event in self.ready_events.drain(..) {
962                 event.reset().unwrap();
963                 event.signal().unwrap();
964             }
965         }
966     }
967 
968     #[derive(Copy, Clone, Serialize, Deserialize, Eq, PartialEq, Debug)]
969     struct MockDevice;
970 
971     impl Suspendable for MockDevice {
snapshot(&mut self) -> anyhow::Result<serde_json::Value>972         fn snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
973             serde_json::to_value(self).context("error serializing")
974         }
975 
restore(&mut self, data: serde_json::Value) -> anyhow::Result<()>976         fn restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> {
977             *self = serde_json::from_value(data).context("error deserializing")?;
978             Ok(())
979         }
980 
sleep(&mut self) -> anyhow::Result<()>981         fn sleep(&mut self) -> anyhow::Result<()> {
982             Ok(())
983         }
984 
wake(&mut self) -> anyhow::Result<()>985         fn wake(&mut self) -> anyhow::Result<()> {
986             Ok(())
987         }
988     }
989 
990     impl BusDevice for MockDevice {
device_id(&self) -> DeviceId991         fn device_id(&self) -> DeviceId {
992             DeviceId::try_from(0xdead_beef).unwrap()
993         }
debug_label(&self) -> String994         fn debug_label(&self) -> String {
995             "mock device".to_owned()
996         }
997     }
998 
999     impl HotPlugBus for MockPort {
hot_plug(&mut self, _addr: PciAddress) -> anyhow::Result<Option<Event>>1000         fn hot_plug(&mut self, _addr: PciAddress) -> anyhow::Result<Option<Event>> {
1001             self.cc_event = Event::new().unwrap();
1002             Ok(Some(self.cc_event.try_clone().unwrap()))
1003         }
1004 
hot_unplug(&mut self, _addr: PciAddress) -> anyhow::Result<Option<Event>>1005         fn hot_unplug(&mut self, _addr: PciAddress) -> anyhow::Result<Option<Event>> {
1006             self.cc_event = Event::new().unwrap();
1007             Ok(Some(self.cc_event.try_clone().unwrap()))
1008         }
1009 
get_ready_notification(&mut self) -> anyhow::Result<Event>1010         fn get_ready_notification(&mut self) -> anyhow::Result<Event> {
1011             let event = Event::new()?;
1012             self.ready_events.push(event.try_clone()?);
1013             Ok(event)
1014         }
1015 
is_match(&self, _host_addr: PciAddress) -> Option<u8>1016         fn is_match(&self, _host_addr: PciAddress) -> Option<u8> {
1017             None
1018         }
1019 
get_address(&self) -> Option<PciAddress>1020         fn get_address(&self) -> Option<PciAddress> {
1021             None
1022         }
1023 
get_secondary_bus_number(&self) -> Option<u8>1024         fn get_secondary_bus_number(&self) -> Option<u8> {
1025             Some(self.downstream_bus)
1026         }
1027 
add_hotplug_device(&mut self, _hotplug_key: HotPlugKey, _guest_addr: PciAddress)1028         fn add_hotplug_device(&mut self, _hotplug_key: HotPlugKey, _guest_addr: PciAddress) {}
1029 
get_hotplug_device(&self, _hotplug_key: HotPlugKey) -> Option<PciAddress>1030         fn get_hotplug_device(&self, _hotplug_key: HotPlugKey) -> Option<PciAddress> {
1031             None
1032         }
1033 
is_empty(&self) -> bool1034         fn is_empty(&self) -> bool {
1035             true
1036         }
1037 
get_hotplug_key(&self) -> Option<HotPlugKey>1038         fn get_hotplug_key(&self) -> Option<HotPlugKey> {
1039             None
1040         }
1041     }
1042 
new_port(downstream_bus: u8) -> Arc<Mutex<MockPort>>1043     fn new_port(downstream_bus: u8) -> Arc<Mutex<MockPort>> {
1044         Arc::new(Mutex::new(MockPort::new(downstream_bus)))
1045     }
1046 
poll_until_with_timeout<F>(f: F, timeout: Duration) -> bool where F: Fn() -> bool,1047     fn poll_until_with_timeout<F>(f: F, timeout: Duration) -> bool
1048     where
1049         F: Fn() -> bool,
1050     {
1051         for _ in 0..timeout.as_millis() {
1052             if f() {
1053                 return true;
1054             }
1055             thread::sleep(Duration::from_millis(1));
1056         }
1057         false
1058     }
1059 
1060     #[test]
worker_empty_port_ordering()1061     fn worker_empty_port_ordering() {
1062         let (rootbus_controller, _rootbus_recvr) = mpsc::channel();
1063         let client = WorkerClient::new(rootbus_controller).unwrap();
1064         // Port A: upstream 00:01.1, downstream 2.
1065         let upstream_addr_a = PciAddress {
1066             bus: 0,
1067             dev: 1,
1068             func: 1,
1069         };
1070         let bus_a = 2;
1071         let downstream_addr_a = PciAddress {
1072             bus: bus_a,
1073             dev: 0,
1074             func: 0,
1075         };
1076         let hotplug_key_a = HotPlugKey::GuestDevice {
1077             guest_addr: downstream_addr_a,
1078         };
1079         let device_a = GuestDeviceStub {
1080             pci_addr: downstream_addr_a,
1081             key: hotplug_key_a,
1082             device: Arc::new(Mutex::new(MockDevice)),
1083         };
1084         let hotplug_command_a =
1085             SignalHotPlugCommand::new(upstream_addr_a, [device_a].to_vec()).unwrap();
1086         let port_a = new_port(bus_a);
1087         // Port B: upstream 00:01.0, downstream 3.
1088         let upstream_addr_b = PciAddress {
1089             bus: 0,
1090             dev: 1,
1091             func: 0,
1092         };
1093         let bus_b = 3;
1094         let downstream_addr_b = PciAddress {
1095             bus: bus_b,
1096             dev: 0,
1097             func: 0,
1098         };
1099         let hotplug_key_b = HotPlugKey::GuestDevice {
1100             guest_addr: downstream_addr_b,
1101         };
1102         let device_b = GuestDeviceStub {
1103             pci_addr: downstream_addr_b,
1104             key: hotplug_key_b,
1105             device: Arc::new(Mutex::new(MockDevice)),
1106         };
1107         let hotplug_command_b =
1108             SignalHotPlugCommand::new(upstream_addr_b, [device_b].to_vec()).unwrap();
1109         let port_b = new_port(bus_b);
1110         // Port C: upstream 00:02.0, downstream 4.
1111         let upstream_addr_c = PciAddress {
1112             bus: 0,
1113             dev: 2,
1114             func: 0,
1115         };
1116         let bus_c = 4;
1117         let downstream_addr_c = PciAddress {
1118             bus: bus_c,
1119             dev: 0,
1120             func: 0,
1121         };
1122         let hotplug_key_c = HotPlugKey::GuestDevice {
1123             guest_addr: downstream_addr_c,
1124         };
1125         let device_c = GuestDeviceStub {
1126             pci_addr: downstream_addr_c,
1127             key: hotplug_key_c,
1128             device: Arc::new(Mutex::new(MockDevice)),
1129         };
1130         let hotplug_command_c =
1131             SignalHotPlugCommand::new(upstream_addr_c, [device_c].to_vec()).unwrap();
1132         let port_c = new_port(bus_c);
1133         assert_eq!(
1134             WorkerResponse::AddPortOk,
1135             client
1136                 .send_worker_command(WorkerCommand::AddPort(
1137                     upstream_addr_a,
1138                     PortWorkerStub::new(port_a.clone(), bus_a).unwrap()
1139                 ))
1140                 .unwrap()
1141         );
1142         assert_eq!(
1143             WorkerResponse::AddPortOk,
1144             client
1145                 .send_worker_command(WorkerCommand::AddPort(
1146                     upstream_addr_b,
1147                     PortWorkerStub::new(port_b.clone(), bus_b).unwrap()
1148                 ))
1149                 .unwrap()
1150         );
1151         assert_eq!(
1152             WorkerResponse::AddPortOk,
1153             client
1154                 .send_worker_command(WorkerCommand::AddPort(
1155                     upstream_addr_c,
1156                     PortWorkerStub::new(port_c.clone(), bus_c).unwrap()
1157                 ))
1158                 .unwrap()
1159         );
1160         port_a.lock().signal_ready();
1161         assert!(poll_until_with_timeout(
1162             || client
1163                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr_a))
1164                 .unwrap()
1165                 == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1166             Duration::from_millis(500)
1167         ));
1168         port_b.lock().signal_ready();
1169         assert!(poll_until_with_timeout(
1170             || client
1171                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr_b))
1172                 .unwrap()
1173                 == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1174             Duration::from_millis(500)
1175         ));
1176         port_c.lock().signal_ready();
1177         assert!(poll_until_with_timeout(
1178             || client
1179                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr_c))
1180                 .unwrap()
1181                 == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1182             Duration::from_millis(500)
1183         ));
1184         // All ports empty and in sync. Should get port B.
1185         assert_eq!(
1186             WorkerResponse::GetEmptyPortOk(upstream_addr_b),
1187             client
1188                 .send_worker_command(WorkerCommand::GetEmptyPort)
1189                 .unwrap()
1190         );
1191         assert_eq!(
1192             WorkerResponse::SignalOk,
1193             client
1194                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command_b))
1195                 .unwrap()
1196         );
1197         // Should get port A.
1198         assert_eq!(
1199             WorkerResponse::GetEmptyPortOk(upstream_addr_a),
1200             client
1201                 .send_worker_command(WorkerCommand::GetEmptyPort)
1202                 .unwrap()
1203         );
1204         assert_eq!(
1205             WorkerResponse::SignalOk,
1206             client
1207                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command_a))
1208                 .unwrap()
1209         );
1210         // Should get port C.
1211         assert_eq!(
1212             WorkerResponse::GetEmptyPortOk(upstream_addr_c),
1213             client
1214                 .send_worker_command(WorkerCommand::GetEmptyPort)
1215                 .unwrap()
1216         );
1217         assert_eq!(
1218             WorkerResponse::SignalOk,
1219             client
1220                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command_c))
1221                 .unwrap()
1222         );
1223         // Should get an error since no port is empty.
1224         if let WorkerResponse::InvalidCommand(_) = client
1225             .send_worker_command(WorkerCommand::GetEmptyPort)
1226             .unwrap()
1227         {
1228             // Assert result is of Error type.
1229         } else {
1230             unreachable!();
1231         }
1232         // Remove device from port A, immediately it should be available.
1233         assert_eq!(
1234             WorkerResponse::SignalOk,
1235             client
1236                 .send_worker_command(WorkerCommand::SignalHotUnplug(upstream_addr_a))
1237                 .unwrap()
1238         );
1239         assert_eq!(
1240             WorkerResponse::GetEmptyPortOk(upstream_addr_a),
1241             client
1242                 .send_worker_command(WorkerCommand::GetEmptyPort)
1243                 .unwrap()
1244         );
1245         // Moreover, it should be 2 steps behind.
1246         assert_eq!(
1247             WorkerResponse::GetPortStateOk(PortState::Empty(2)),
1248             client
1249                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr_a))
1250                 .unwrap()
1251         );
1252     }
1253 
1254     #[test]
worker_port_state_transitions()1255     fn worker_port_state_transitions() {
1256         let (rootbus_controller, _rootbus_recvr) = mpsc::channel();
1257         let client = WorkerClient::new(rootbus_controller).unwrap();
1258         let upstream_addr = PciAddress {
1259             bus: 0,
1260             dev: 1,
1261             func: 1,
1262         };
1263         let bus = 2;
1264         let downstream_addr = PciAddress {
1265             bus,
1266             dev: 0,
1267             func: 0,
1268         };
1269         let hotplug_key = HotPlugKey::GuestDevice {
1270             guest_addr: downstream_addr,
1271         };
1272         let device = GuestDeviceStub {
1273             pci_addr: downstream_addr,
1274             key: hotplug_key,
1275             device: Arc::new(Mutex::new(MockDevice)),
1276         };
1277         let hotplug_command = SignalHotPlugCommand::new(upstream_addr, [device].to_vec()).unwrap();
1278         let port = new_port(bus);
1279         assert_eq!(
1280             WorkerResponse::AddPortOk,
1281             client
1282                 .send_worker_command(WorkerCommand::AddPort(
1283                     upstream_addr,
1284                     PortWorkerStub::new(port.clone(), bus).unwrap()
1285                 ))
1286                 .unwrap()
1287         );
1288         port.lock().signal_ready();
1289         assert!(poll_until_with_timeout(
1290             || client
1291                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1292                 .unwrap()
1293                 == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1294             Duration::from_millis(500)
1295         ));
1296         assert_eq!(
1297             WorkerResponse::SignalOk,
1298             client
1299                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command.clone()))
1300                 .unwrap()
1301         );
1302         assert!(poll_until_with_timeout(
1303             || client
1304                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1305                 .unwrap()
1306                 == WorkerResponse::GetPortStateOk(PortState::Occupied(1)),
1307             Duration::from_millis(500)
1308         ));
1309         assert_eq!(
1310             WorkerResponse::SignalOk,
1311             client
1312                 .send_worker_command(WorkerCommand::SignalHotUnplug(upstream_addr))
1313                 .unwrap()
1314         );
1315         assert!(poll_until_with_timeout(
1316             || client
1317                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1318                 .unwrap()
1319                 == WorkerResponse::GetPortStateOk(PortState::Empty(2)),
1320             Duration::from_millis(500)
1321         ));
1322         assert_eq!(
1323             WorkerResponse::SignalOk,
1324             client
1325                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command.clone()))
1326                 .unwrap()
1327         );
1328         assert!(poll_until_with_timeout(
1329             || client
1330                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1331                 .unwrap()
1332                 == WorkerResponse::GetPortStateOk(PortState::Occupied(3)),
1333             Duration::from_millis(500)
1334         ));
1335         port.lock().signal_cc();
1336         assert!(poll_until_with_timeout(
1337             || client
1338                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1339                 .unwrap()
1340                 == WorkerResponse::GetPortStateOk(PortState::Occupied(2)),
1341             Duration::from_millis(500)
1342         ));
1343         assert_eq!(
1344             WorkerResponse::SignalOk,
1345             client
1346                 .send_worker_command(WorkerCommand::SignalHotUnplug(upstream_addr))
1347                 .unwrap()
1348         );
1349         // Moves from Occupied(2) to Empty(1) since it is redundant to unplug a device that is yet
1350         // to be plugged in.
1351         assert!(poll_until_with_timeout(
1352             || client
1353                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1354                 .unwrap()
1355                 == WorkerResponse::GetPortStateOk(PortState::Empty(1)),
1356             Duration::from_millis(500)
1357         ));
1358         port.lock().signal_cc();
1359         assert!(poll_until_with_timeout(
1360             || client
1361                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1362                 .unwrap()
1363                 == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1364             Duration::from_millis(500)
1365         ));
1366     }
1367 
1368     #[test]
worker_port_early_plug_state_transitions()1369     fn worker_port_early_plug_state_transitions() {
1370         let (rootbus_controller, _rootbus_recvr) = mpsc::channel();
1371         let client = WorkerClient::new(rootbus_controller).unwrap();
1372         let upstream_addr = PciAddress {
1373             bus: 0,
1374             dev: 1,
1375             func: 1,
1376         };
1377         let bus = 2;
1378         let downstream_addr = PciAddress {
1379             bus,
1380             dev: 0,
1381             func: 0,
1382         };
1383         let hotplug_key = HotPlugKey::GuestDevice {
1384             guest_addr: downstream_addr,
1385         };
1386         let device = GuestDeviceStub {
1387             pci_addr: downstream_addr,
1388             key: hotplug_key,
1389             device: Arc::new(Mutex::new(MockDevice)),
1390         };
1391         let hotplug_command = SignalHotPlugCommand::new(upstream_addr, [device].to_vec()).unwrap();
1392         let port = new_port(bus);
1393         assert_eq!(
1394             WorkerResponse::AddPortOk,
1395             client
1396                 .send_worker_command(WorkerCommand::AddPort(
1397                     upstream_addr,
1398                     PortWorkerStub::new(port.clone(), bus).unwrap()
1399                 ))
1400                 .unwrap()
1401         );
1402         assert!(poll_until_with_timeout(
1403             || client
1404                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1405                 .unwrap()
1406                 == WorkerResponse::GetPortStateOk(PortState::EmptyNotReady),
1407             Duration::from_millis(500)
1408         ));
1409         assert_eq!(
1410             WorkerResponse::SignalOk,
1411             client
1412                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command.clone()))
1413                 .unwrap()
1414         );
1415         assert!(poll_until_with_timeout(
1416             || client
1417                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1418                 .unwrap()
1419                 == WorkerResponse::GetPortStateOk(PortState::OccupiedNotReady),
1420             Duration::from_millis(500)
1421         ));
1422         port.lock().signal_ready();
1423         assert!(poll_until_with_timeout(
1424             || client
1425                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1426                 .unwrap()
1427                 == WorkerResponse::GetPortStateOk(PortState::Occupied(1)),
1428             Duration::from_millis(500)
1429         ));
1430         port.lock().signal_cc();
1431         assert!(poll_until_with_timeout(
1432             || client
1433                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1434                 .unwrap()
1435                 == WorkerResponse::GetPortStateOk(PortState::Occupied(0)),
1436             Duration::from_millis(500)
1437         ));
1438     }
1439 }
1440