1 // Copyright 2023 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! Balloon related control APIs.
6
7 use std::collections::VecDeque;
8
9 use anyhow::bail;
10 use anyhow::Context;
11 use anyhow::Result;
12 use balloon_control::BalloonTubeCommand;
13 use balloon_control::BalloonTubeResult;
14 use base::error;
15 use base::Error as SysError;
16 use base::Tube;
17 use serde::Deserialize;
18 use serde::Serialize;
19
20 use crate::VmResponse;
21
22 // Balloon commands that are sent on the crosvm control socket.
23 #[derive(Serialize, Deserialize, Debug, Clone)]
24 pub enum BalloonControlCommand {
25 /// Set the size of the VM's balloon.
26 Adjust {
27 num_bytes: u64,
28 wait_for_success: bool,
29 },
30 Stats,
31 WorkingSet,
32 WorkingSetConfig {
33 bins: Vec<u32>,
34 refresh_threshold: u32,
35 report_threshold: u32,
36 },
37 }
38
do_send(tube: &Tube, cmd: &BalloonControlCommand) -> Option<VmResponse>39 fn do_send(tube: &Tube, cmd: &BalloonControlCommand) -> Option<VmResponse> {
40 match *cmd {
41 BalloonControlCommand::Adjust {
42 num_bytes,
43 wait_for_success,
44 } => {
45 match tube.send(&BalloonTubeCommand::Adjust {
46 num_bytes,
47 allow_failure: wait_for_success,
48 }) {
49 Ok(_) => {
50 if wait_for_success {
51 None
52 } else {
53 Some(VmResponse::Ok)
54 }
55 }
56 Err(_) => Some(VmResponse::Err(SysError::last())),
57 }
58 }
59 BalloonControlCommand::WorkingSetConfig {
60 ref bins,
61 refresh_threshold,
62 report_threshold,
63 } => {
64 match tube.send(&BalloonTubeCommand::WorkingSetConfig {
65 bins: bins.clone(),
66 refresh_threshold,
67 report_threshold,
68 }) {
69 Ok(_) => Some(VmResponse::Ok),
70 Err(_) => Some(VmResponse::Err(SysError::last())),
71 }
72 }
73 BalloonControlCommand::Stats => match tube.send(&BalloonTubeCommand::Stats) {
74 Ok(_) => None,
75 Err(_) => Some(VmResponse::Err(SysError::last())),
76 },
77 BalloonControlCommand::WorkingSet => match tube.send(&BalloonTubeCommand::WorkingSet) {
78 Ok(_) => None,
79 Err(_) => Some(VmResponse::Err(SysError::last())),
80 },
81 }
82 }
83
84 /// Utility for multiplexing a balloon tube between multiple control tubes. Commands
85 /// are sent and processed serially.
86 pub struct BalloonTube {
87 tube: Tube,
88 pending_queue: VecDeque<(BalloonControlCommand, Option<usize>)>,
89 pending_adjust_with_completion: Option<(u64, usize)>,
90 }
91
92 impl BalloonTube {
new(tube: Tube) -> Self93 pub fn new(tube: Tube) -> Self {
94 BalloonTube {
95 tube,
96 pending_queue: VecDeque::new(),
97 pending_adjust_with_completion: None,
98 }
99 }
100
101 /// Sends or queues the given command to this tube. Associates the
102 /// response with the given key.
send_cmd( &mut self, cmd: BalloonControlCommand, key: Option<usize>, ) -> Option<(VmResponse, usize)>103 pub fn send_cmd(
104 &mut self,
105 cmd: BalloonControlCommand,
106 key: Option<usize>,
107 ) -> Option<(VmResponse, usize)> {
108 match cmd {
109 BalloonControlCommand::Adjust {
110 wait_for_success: true,
111 num_bytes,
112 } => {
113 let Some(key) = key else {
114 error!("Asked for completion without reply key");
115 return None;
116 };
117 let resp = self
118 .pending_adjust_with_completion
119 .take()
120 .map(|(_, key)| (VmResponse::ErrString("Adjust overriden".to_string()), key));
121 if do_send(&self.tube, &cmd).is_some() {
122 unreachable!("Unexpected early reply");
123 }
124 self.pending_adjust_with_completion = Some((num_bytes, key));
125 resp
126 }
127 _ => {
128 if !self.pending_queue.is_empty() {
129 self.pending_queue.push_back((cmd, key));
130 return None;
131 }
132 let resp = do_send(&self.tube, &cmd);
133 if resp.is_none() {
134 self.pending_queue.push_back((cmd, key));
135 }
136 match key {
137 None => None,
138 Some(key) => resp.map(|r| (r, key)),
139 }
140 }
141 }
142 }
143
144 /// Receives responses from the balloon tube, and returns them with
145 /// their assoicated keys.
recv(&mut self) -> Result<Vec<(VmResponse, usize)>>146 pub fn recv(&mut self) -> Result<Vec<(VmResponse, usize)>> {
147 let res = self
148 .tube
149 .recv::<BalloonTubeResult>()
150 .context("failed to read balloon tube")?;
151 if let BalloonTubeResult::Adjusted { num_bytes: actual } = res {
152 let Some((target, key)) = self.pending_adjust_with_completion else {
153 bail!("Unexpected balloon adjust to {}", actual);
154 };
155 if actual != target {
156 return Ok(vec![]);
157 }
158 self.pending_adjust_with_completion.take();
159 return Ok(vec![(VmResponse::Ok, key)]);
160 }
161 let mut responses = vec![];
162 if self.pending_queue.is_empty() {
163 bail!("Unexpected balloon tube result {:?}", res)
164 }
165 let resp = match (
166 &self.pending_queue.front().expect("entry disappeared").0,
167 res,
168 ) {
169 (
170 BalloonControlCommand::Stats,
171 BalloonTubeResult::Stats {
172 stats,
173 balloon_actual,
174 },
175 ) => VmResponse::BalloonStats {
176 stats,
177 balloon_actual,
178 },
179 (
180 BalloonControlCommand::WorkingSet,
181 BalloonTubeResult::WorkingSet { ws, balloon_actual },
182 ) => VmResponse::BalloonWS { ws, balloon_actual },
183 (_, resp) => {
184 bail!("Unexpected balloon tube result {:?}", resp);
185 }
186 };
187 let key = self.pending_queue.pop_front().expect("entry disappeared").1;
188 if let Some(key) = key {
189 responses.push((resp, key))
190 }
191 while let Some((cmd, key)) = self.pending_queue.front() {
192 match do_send(&self.tube, cmd) {
193 Some(resp) => {
194 if let Some(key) = key {
195 responses.push((resp, *key));
196 }
197 self.pending_queue.pop_front();
198 }
199 None => break,
200 }
201 }
202 Ok(responses)
203 }
204 }
205
206 #[cfg(test)]
207 mod tests {
208 use balloon_control::BalloonStats;
209
210 use super::*;
211
balloon_device_respond_stats(device: &Tube)212 fn balloon_device_respond_stats(device: &Tube) {
213 let BalloonTubeCommand::Stats = device.recv::<BalloonTubeCommand>().unwrap() else {
214 panic!("unexpected command");
215 };
216
217 device
218 .send(&BalloonTubeResult::Stats {
219 stats: BalloonStats::default(),
220 balloon_actual: 0,
221 })
222 .unwrap();
223 }
224
225 #[test]
test_stat_command()226 fn test_stat_command() {
227 let (host, device) = Tube::pair().unwrap();
228 let mut balloon_tube = BalloonTube::new(host);
229
230 let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
231 assert!(resp.is_none());
232
233 balloon_device_respond_stats(&device);
234
235 let resp = balloon_tube.recv().unwrap();
236 assert_eq!(resp.len(), 1);
237 assert_eq!(resp[0].1, 0xc0ffee);
238 assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
239 }
240
241 #[test]
test_multiple_stat_command()242 fn test_multiple_stat_command() {
243 let (host, device) = Tube::pair().unwrap();
244 let mut balloon_tube = BalloonTube::new(host);
245
246 let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
247 assert!(resp.is_none());
248 let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xbadcafe));
249 assert!(resp.is_none());
250
251 balloon_device_respond_stats(&device);
252
253 let resp = balloon_tube.recv().unwrap();
254 assert_eq!(resp.len(), 1);
255 assert_eq!(resp[0].1, 0xc0ffee);
256 assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
257
258 balloon_device_respond_stats(&device);
259
260 let resp = balloon_tube.recv().unwrap();
261 assert_eq!(resp.len(), 1);
262 assert_eq!(resp[0].1, 0xbadcafe);
263 assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
264 }
265
266 #[test]
test_queued_stats_adjust_no_reply()267 fn test_queued_stats_adjust_no_reply() {
268 let (host, device) = Tube::pair().unwrap();
269 let mut balloon_tube = BalloonTube::new(host);
270
271 let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
272 assert!(resp.is_none());
273 let resp = balloon_tube.send_cmd(
274 BalloonControlCommand::Adjust {
275 num_bytes: 0,
276 wait_for_success: false,
277 },
278 Some(0xbadcafe),
279 );
280 assert!(resp.is_none());
281
282 balloon_device_respond_stats(&device);
283
284 let resp = balloon_tube.recv().unwrap();
285 assert_eq!(resp.len(), 2);
286 assert_eq!(resp[0].1, 0xc0ffee);
287 assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
288 assert_eq!(resp[1].1, 0xbadcafe);
289 assert!(matches!(resp[1].0, VmResponse::Ok));
290
291 let cmd = device.recv::<BalloonTubeCommand>().unwrap();
292 assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
293 }
294
295 #[test]
test_adjust_with_reply()296 fn test_adjust_with_reply() {
297 let (host, device) = Tube::pair().unwrap();
298 let mut balloon_tube = BalloonTube::new(host);
299
300 let resp = balloon_tube.send_cmd(
301 BalloonControlCommand::Adjust {
302 num_bytes: 0xc0ffee,
303 wait_for_success: true,
304 },
305 Some(0xc0ffee),
306 );
307 assert!(resp.is_none());
308 let cmd = device.recv::<BalloonTubeCommand>().unwrap();
309 assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
310
311 let resp = balloon_tube.send_cmd(
312 BalloonControlCommand::Adjust {
313 num_bytes: 0xbadcafe,
314 wait_for_success: true,
315 },
316 Some(0xbadcafe),
317 );
318 assert!(matches!(resp, Some((VmResponse::ErrString(_), 0xc0ffee))));
319 let cmd = device.recv::<BalloonTubeCommand>().unwrap();
320 assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
321
322 device
323 .send(&BalloonTubeResult::Adjusted {
324 num_bytes: 0xc0ffee,
325 })
326 .unwrap();
327 let resp = balloon_tube.recv().unwrap();
328 assert_eq!(resp.len(), 0);
329
330 device
331 .send(&BalloonTubeResult::Adjusted {
332 num_bytes: 0xbadcafe,
333 })
334 .unwrap();
335 let resp = balloon_tube.recv().unwrap();
336 assert_eq!(resp.len(), 1);
337 assert_eq!(resp[0].1, 0xbadcafe);
338 assert!(matches!(resp[0].0, VmResponse::Ok));
339 }
340
341 #[test]
test_stats_and_adjust_with_reply()342 fn test_stats_and_adjust_with_reply() {
343 let (host, device) = Tube::pair().unwrap();
344 let mut balloon_tube = BalloonTube::new(host);
345
346 let resp = balloon_tube.send_cmd(BalloonControlCommand::Stats, Some(0xc0ffee));
347 assert!(resp.is_none());
348
349 let resp = balloon_tube.send_cmd(
350 BalloonControlCommand::Adjust {
351 num_bytes: 0xbadcafe,
352 wait_for_success: true,
353 },
354 Some(0xbadcafe),
355 );
356 assert!(resp.is_none());
357
358 let cmd = device.recv::<BalloonTubeCommand>().unwrap();
359 assert!(matches!(cmd, BalloonTubeCommand::Stats));
360 let cmd = device.recv::<BalloonTubeCommand>().unwrap();
361 assert!(matches!(cmd, BalloonTubeCommand::Adjust { .. }));
362
363 device
364 .send(&BalloonTubeResult::Adjusted {
365 num_bytes: 0xbadcafe,
366 })
367 .unwrap();
368 let resp = balloon_tube.recv().unwrap();
369 assert_eq!(resp.len(), 1);
370 assert_eq!(resp[0].1, 0xbadcafe);
371 assert!(matches!(resp[0].0, VmResponse::Ok));
372
373 device
374 .send(&BalloonTubeResult::Stats {
375 stats: BalloonStats::default(),
376 balloon_actual: 0,
377 })
378 .unwrap();
379 let resp = balloon_tube.recv().unwrap();
380 assert_eq!(resp.len(), 1);
381 assert_eq!(resp[0].1, 0xc0ffee);
382 assert!(matches!(resp[0].0, VmResponse::BalloonStats { .. }));
383 }
384 }
385