xref: /aosp_15_r20/external/crosvm/vm_control/src/balloon_tube.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2023 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Balloon related control APIs.
6 
7 use std::collections::VecDeque;
8 
9 use anyhow::bail;
10 use anyhow::Context;
11 use anyhow::Result;
12 use balloon_control::BalloonTubeCommand;
13 use balloon_control::BalloonTubeResult;
14 use base::error;
15 use base::Error as SysError;
16 use base::Tube;
17 use serde::Deserialize;
18 use serde::Serialize;
19 
20 use crate::VmResponse;
21 
22 // Balloon commands that are sent on the crosvm control socket.
23 #[derive(Serialize, Deserialize, Debug, Clone)]
24 pub enum BalloonControlCommand {
25     /// Set the size of the VM's balloon.
26     Adjust {
27         num_bytes: u64,
28         wait_for_success: bool,
29     },
30     Stats,
31     WorkingSet,
32     WorkingSetConfig {
33         bins: Vec<u32>,
34         refresh_threshold: u32,
35         report_threshold: u32,
36     },
37 }
38 
do_send(tube: &Tube, cmd: &BalloonControlCommand) -> Option<VmResponse>39 fn do_send(tube: &Tube, cmd: &BalloonControlCommand) -> Option<VmResponse> {
40     match *cmd {
41         BalloonControlCommand::Adjust {
42             num_bytes,
43             wait_for_success,
44         } => {
45             match tube.send(&BalloonTubeCommand::Adjust {
46                 num_bytes,
47                 allow_failure: wait_for_success,
48             }) {
49                 Ok(_) => {
50                     if wait_for_success {
51                         None
52                     } else {
53                         Some(VmResponse::Ok)
54                     }
55                 }
56                 Err(_) => Some(VmResponse::Err(SysError::last())),
57             }
58         }
59         BalloonControlCommand::WorkingSetConfig {
60             ref bins,
61             refresh_threshold,
62             report_threshold,
63         } => {
64             match tube.send(&BalloonTubeCommand::WorkingSetConfig {
65                 bins: bins.clone(),
66                 refresh_threshold,
67                 report_threshold,
68             }) {
69                 Ok(_) => Some(VmResponse::Ok),
70                 Err(_) => Some(VmResponse::Err(SysError::last())),
71             }
72         }
73         BalloonControlCommand::Stats => match tube.send(&BalloonTubeCommand::Stats) {
74             Ok(_) => None,
75             Err(_) => Some(VmResponse::Err(SysError::last())),
76         },
77         BalloonControlCommand::WorkingSet => match tube.send(&BalloonTubeCommand::WorkingSet) {
78             Ok(_) => None,
79             Err(_) => Some(VmResponse::Err(SysError::last())),
80         },
81     }
82 }
83 
84 /// Utility for multiplexing a balloon tube between multiple control tubes. Commands
85 /// are sent and processed serially.
86 pub struct BalloonTube {
87     tube: Tube,
88     pending_queue: VecDeque<(BalloonControlCommand, Option<usize>)>,
89     pending_adjust_with_completion: Option<(u64, usize)>,
90 }
91 
92 impl BalloonTube {
new(tube: Tube) -> Self93     pub fn new(tube: Tube) -> Self {
94         BalloonTube {
95             tube,
96             pending_queue: VecDeque::new(),
97             pending_adjust_with_completion: None,
98         }
99     }
100 
101     /// Sends or queues the given command to this tube. Associates the
102     /// response with the given key.
send_cmd( &mut self, cmd: BalloonControlCommand, key: Option<usize>, ) -> Option<(VmResponse, usize)>103     pub fn send_cmd(
104         &mut self,
105         cmd: BalloonControlCommand,
106         key: Option<usize>,
107     ) -> Option<(VmResponse, usize)> {
108         match cmd {
109             BalloonControlCommand::Adjust {
110                 wait_for_success: true,
111                 num_bytes,
112             } => {
113                 let Some(key) = key else {
114                     error!("Asked for completion without reply key");
115                     return None;
116                 };
117                 let resp = self
118                     .pending_adjust_with_completion
119                     .take()
120                     .map(|(_, key)| (VmResponse::ErrString("Adjust overriden".to_string()), key));
121                 if do_send(&self.tube, &cmd).is_some() {
122                     unreachable!("Unexpected early reply");
123                 }
124                 self.pending_adjust_with_completion = Some((num_bytes, key));
125                 resp
126             }
127             _ => {
128                 if !self.pending_queue.is_empty() {
129                     self.pending_queue.push_back((cmd, key));
130                     return None;
131                 }
132                 let resp = do_send(&self.tube, &cmd);
133                 if resp.is_none() {
134                     self.pending_queue.push_back((cmd, key));
135                 }
136                 match key {
137                     None => None,
138                     Some(key) => resp.map(|r| (r, key)),
139                 }
140             }
141         }
142     }
143 
144     /// Receives responses from the balloon tube, and returns them with
145     /// their assoicated keys.
recv(&mut self) -> Result<Vec<(VmResponse, usize)>>146     pub fn recv(&mut self) -> Result<Vec<(VmResponse, usize)>> {
147         let res = self
148             .tube
149             .recv::<BalloonTubeResult>()
150             .context("failed to read balloon tube")?;
151         if let BalloonTubeResult::Adjusted { num_bytes: actual } = res {
152             let Some((target, key)) = self.pending_adjust_with_completion else {
153                 bail!("Unexpected balloon adjust to {}", actual);
154             };
155             if actual != target {
156                 return Ok(vec![]);
157             }
158             self.pending_adjust_with_completion.take();
159             return Ok(vec![(VmResponse::Ok, key)]);
160         }
161         let mut responses = vec![];
162         if self.pending_queue.is_empty() {
163             bail!("Unexpected balloon tube result {:?}", res)
164         }
165         let resp = match (
166             &self.pending_queue.front().expect("entry disappeared").0,
167             res,
168         ) {
169             (
170                 BalloonControlCommand::Stats,
171                 BalloonTubeResult::Stats {
172                     stats,
173                     balloon_actual,
174                 },
175             ) => VmResponse::BalloonStats {
176                 stats,
177                 balloon_actual,
178             },
179             (
180                 BalloonControlCommand::WorkingSet,
181                 BalloonTubeResult::WorkingSet { ws, balloon_actual },
182             ) => VmResponse::BalloonWS { ws, balloon_actual },
183             (_, resp) => {
184                 bail!("Unexpected balloon tube result {:?}", resp);
185             }
186         };
187         let key = self.pending_queue.pop_front().expect("entry disappeared").1;
188         if let Some(key) = key {
189             responses.push((resp, key))
190         }
191         while let Some((cmd, key)) = self.pending_queue.front() {
192             match do_send(&self.tube, cmd) {
193                 Some(resp) => {
194                     if let Some(key) = key {
195                         responses.push((resp, *key));
196                     }
197                     self.pending_queue.pop_front();
198                 }
199                 None => break,
200             }
201         }
202         Ok(responses)
203     }
204 }
205 
206 #[cfg(test)]
207 mod tests {
208     use balloon_control::BalloonStats;
209 
210     use super::*;
211 
balloon_device_respond_stats(device: &Tube)212     fn balloon_device_respond_stats(device: &Tube) {
213         let BalloonTubeCommand::Stats = device.recv::<BalloonTubeCommand>().unwrap() else {
214             panic!("unexpected command");
215         };
216 
217         device
218             .send(&BalloonTubeResult::Stats {
219                 stats: BalloonStats::default(),
220                 balloon_actual: 0,
221             })
222             .unwrap();
223     }
224 
225     #[test]
test_stat_command()226     fn test_stat_command() {
227         let (host, device) = Tube::pair().unwrap();
228         let mut balloon_tube = BalloonTube::new(host);
229 
230         let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
231         assert!(resp.is_none());
232 
233         balloon_device_respond_stats(&device);
234 
235         let resp = balloon_tube.recv().unwrap();
236         assert_eq!(resp.len(), 1);
237         assert_eq!(resp[0].1, 0xc0ffee);
238         assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
239     }
240 
241     #[test]
test_multiple_stat_command()242     fn test_multiple_stat_command() {
243         let (host, device) = Tube::pair().unwrap();
244         let mut balloon_tube = BalloonTube::new(host);
245 
246         let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
247         assert!(resp.is_none());
248         let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xbadcafe));
249         assert!(resp.is_none());
250 
251         balloon_device_respond_stats(&device);
252 
253         let resp = balloon_tube.recv().unwrap();
254         assert_eq!(resp.len(), 1);
255         assert_eq!(resp[0].1, 0xc0ffee);
256         assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
257 
258         balloon_device_respond_stats(&device);
259 
260         let resp = balloon_tube.recv().unwrap();
261         assert_eq!(resp.len(), 1);
262         assert_eq!(resp[0].1, 0xbadcafe);
263         assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
264     }
265 
266     #[test]
test_queued_stats_adjust_no_reply()267     fn test_queued_stats_adjust_no_reply() {
268         let (host, device) = Tube::pair().unwrap();
269         let mut balloon_tube = BalloonTube::new(host);
270 
271         let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
272         assert!(resp.is_none());
273         let resp = balloon_tube.send_cmd(
274             BalloonControlCommand::Adjust {
275                 num_bytes: 0,
276                 wait_for_success: false,
277             },
278             Some(0xbadcafe),
279         );
280         assert!(resp.is_none());
281 
282         balloon_device_respond_stats(&device);
283 
284         let resp = balloon_tube.recv().unwrap();
285         assert_eq!(resp.len(), 2);
286         assert_eq!(resp[0].1, 0xc0ffee);
287         assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
288         assert_eq!(resp[1].1, 0xbadcafe);
289         assert!(matches!(resp[1].0, VmResponse::Ok));
290 
291         let cmd = device.recv::<BalloonTubeCommand>().unwrap();
292         assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
293     }
294 
295     #[test]
test_adjust_with_reply()296     fn test_adjust_with_reply() {
297         let (host, device) = Tube::pair().unwrap();
298         let mut balloon_tube = BalloonTube::new(host);
299 
300         let resp = balloon_tube.send_cmd(
301             BalloonControlCommand::Adjust {
302                 num_bytes: 0xc0ffee,
303                 wait_for_success: true,
304             },
305             Some(0xc0ffee),
306         );
307         assert!(resp.is_none());
308         let cmd = device.recv::<BalloonTubeCommand>().unwrap();
309         assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
310 
311         let resp = balloon_tube.send_cmd(
312             BalloonControlCommand::Adjust {
313                 num_bytes: 0xbadcafe,
314                 wait_for_success: true,
315             },
316             Some(0xbadcafe),
317         );
318         assert!(matches!(resp, Some((VmResponse::ErrString(_), 0xc0ffee))));
319         let cmd = device.recv::<BalloonTubeCommand>().unwrap();
320         assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
321 
322         device
323             .send(&BalloonTubeResult::Adjusted {
324                 num_bytes: 0xc0ffee,
325             })
326             .unwrap();
327         let resp = balloon_tube.recv().unwrap();
328         assert_eq!(resp.len(), 0);
329 
330         device
331             .send(&BalloonTubeResult::Adjusted {
332                 num_bytes: 0xbadcafe,
333             })
334             .unwrap();
335         let resp = balloon_tube.recv().unwrap();
336         assert_eq!(resp.len(), 1);
337         assert_eq!(resp[0].1, 0xbadcafe);
338         assert!(matches!(resp[0].0, VmResponse::Ok));
339     }
340 
341     #[test]
test_stats_and_adjust_with_reply()342     fn test_stats_and_adjust_with_reply() {
343         let (host, device) = Tube::pair().unwrap();
344         let mut balloon_tube = BalloonTube::new(host);
345 
346         let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
347         assert!(resp.is_none());
348 
349         let resp = balloon_tube.send_cmd(
350             BalloonControlCommand::Adjust {
351                 num_bytes: 0xbadcafe,
352                 wait_for_success: true,
353             },
354             Some(0xbadcafe),
355         );
356         assert!(resp.is_none());
357 
358         let cmd = device.recv::<BalloonTubeCommand>().unwrap();
359         assert!(matches!(cmd, BalloonTubeCommand::Stats));
360         let cmd = device.recv::<BalloonTubeCommand>().unwrap();
361         assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
362 
363         device
364             .send(&BalloonTubeResult::Adjusted {
365                 num_bytes: 0xbadcafe,
366             })
367             .unwrap();
368         let resp = balloon_tube.recv().unwrap();
369         assert_eq!(resp.len(), 1);
370         assert_eq!(resp[0].1, 0xbadcafe);
371         assert!(matches!(resp[0].0, VmResponse::Ok));
372 
373         device
374             .send(&BalloonTubeResult::Stats {
375                 stats: BalloonStats::default(),
376                 balloon_actual: 0,
377             })
378             .unwrap();
379         let resp = balloon_tube.recv().unwrap();
380         assert_eq!(resp.len(), 1);
381         assert_eq!(resp[0].1, 0xc0ffee);
382         assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
383     }
384 }
385