xref: /aosp_15_r20/external/crosvm/swap/src/controller.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! crate for the vmm-swap feature.
6 
7 #![deny(missing_docs)]
8 
9 use std::fs::File;
10 use std::fs::OpenOptions;
11 use std::io::stderr;
12 use std::io::stdout;
13 use std::ops::Range;
14 use std::os::unix::fs::OpenOptionsExt;
15 use std::path::Path;
16 use std::thread::Scope;
17 use std::thread::ScopedJoinHandle;
18 use std::time::Duration;
19 use std::time::Instant;
20 
21 use anyhow::bail;
22 use anyhow::Context;
23 use base::debug;
24 use base::error;
25 use base::info;
26 use base::linux::FileDataIterator;
27 use base::syslog;
28 use base::warn;
29 use base::AsRawDescriptor;
30 use base::AsRawDescriptors;
31 use base::EventToken;
32 use base::RawDescriptor;
33 use base::SendTube;
34 use base::SharedMemory;
35 use base::Tube;
36 use base::TubeError;
37 use base::WaitContext;
38 use jail::create_base_minijail;
39 use jail::create_sandbox_minijail;
40 use jail::fork::fork_process;
41 use jail::fork::Child;
42 use jail::JailConfig;
43 use jail::SandboxConfig;
44 use jail::MAX_OPEN_FILES_DEFAULT;
45 use once_cell::sync::Lazy;
46 use serde::Deserialize;
47 use serde::Serialize;
48 use sync::Mutex;
49 use vm_memory::GuestMemory;
50 
51 use crate::file_truncator::FileTruncator;
52 use crate::page_handler::Error as PageHandlerError;
53 use crate::page_handler::MoveToStaging;
54 use crate::page_handler::PageHandler;
55 use crate::page_handler::MLOCK_BUDGET;
56 use crate::pagesize::bytes_to_pages;
57 use crate::pagesize::THP_SIZE;
58 use crate::processes::freeze_child_processes;
59 use crate::processes::ProcessesGuard;
60 use crate::uffd_list::Token as UffdListToken;
61 use crate::uffd_list::UffdList;
62 use crate::userfaultfd::register_regions;
63 use crate::userfaultfd::unregister_regions;
64 use crate::userfaultfd::DeadUffdCheckerImpl;
65 use crate::userfaultfd::Error as UffdError;
66 use crate::userfaultfd::Factory as UffdFactory;
67 use crate::userfaultfd::UffdEvent;
68 use crate::userfaultfd::Userfaultfd;
69 use crate::worker::BackgroundJobControl;
70 use crate::worker::Worker;
71 use crate::SwapMetrics;
72 use crate::SwapState;
73 use crate::SwapStateTransition;
74 use crate::SwapStatus;
75 
76 /// The max size of chunks to swap out/in at once.
77 const MAX_SWAP_CHUNK_SIZE: usize = 2 * 1024 * 1024; // = 2MB
78 /// The max pages to trim at once.
79 const MAX_TRIM_PAGES: usize = 1024;
80 
81 /// Returns count of pages active on the guest memory.
count_resident_pages(guest_memory: &GuestMemory) -> usize82 fn count_resident_pages(guest_memory: &GuestMemory) -> usize {
83     let mut pages = 0;
84     for region in guest_memory.regions() {
85         let mut resident_bytes = 0u64;
86         for range in FileDataIterator::new(region.shm, region.shm_offset, region.size as u64) {
87             let range = match range {
88                 Ok(r) => r,
89                 Err(e) => {
90                     error!("failed to iterate data ranges: {e:?}");
91                     return 0;
92                 }
93             };
94             resident_bytes += range.end - range.start;
95         }
96         let resident_bytes = match resident_bytes.try_into() {
97             Ok(n) => n,
98             Err(e) => {
99                 error!("failed to load resident pages count: {:?}", e);
100                 return 0;
101             }
102         };
103 
104         pages += bytes_to_pages(resident_bytes);
105     }
106     pages
107 }
108 
109 /// Commands used in vmm-swap feature internally sent to the monitor process from the main and other
110 /// processes.
111 ///
112 /// This is mainly originated from the `crosvm swap <command>` command line.
113 #[derive(Serialize, Deserialize)]
114 enum Command {
115     Enable,
116     Trim,
117     SwapOut,
118     Disable {
119         slow_file_cleanup: bool,
120     },
121     Exit,
122     Status,
123     ProcessForked {
124         #[serde(with = "base::with_as_descriptor")]
125         uffd: Userfaultfd,
126         reply_tube: Tube,
127     },
128     StaticDeviceSetupComplete(u32),
129 }
130 
131 /// [SwapController] provides APIs to control vmm-swap.
132 pub struct SwapController {
133     child_process: Option<Child>,
134     uffd_factory: UffdFactory,
135     command_tube: Tube,
136     num_static_devices: u32,
137     // Keep 1 page dummy mmap in the main process to make it present in all the descendant
138     // processes.
139     _dead_uffd_checker: DeadUffdCheckerImpl,
140     // Keep the cloned [GuestMemory] in the main process not to free it before the monitor process
141     // exits.
142     _guest_memory: GuestMemory,
143 }
144 
145 impl SwapController {
146     /// Launch a monitor process for vmm-swap and return a controller.
147     ///
148     /// Pages on the [GuestMemory] are registered to userfaultfd to track pagefault events.
149     ///
150     /// # Arguments
151     ///
152     /// * `guest_memory` - fresh new [GuestMemory]. Any pages on the [GuestMemory] must not be
153     ///   touched.
154     /// * `swap_dir` - directory to store swap files.
launch( guest_memory: GuestMemory, swap_dir: &Path, jail_config: &Option<JailConfig>, ) -> anyhow::Result<Self>155     pub fn launch(
156         guest_memory: GuestMemory,
157         swap_dir: &Path,
158         jail_config: &Option<JailConfig>,
159     ) -> anyhow::Result<Self> {
160         info!("vmm-swap is enabled. launch monitor process.");
161 
162         let preserved_guest_memory = guest_memory.clone();
163 
164         let uffd_factory = UffdFactory::new();
165         let uffd = uffd_factory.create().context("create userfaultfd")?;
166 
167         // The swap file is created as `O_TMPFILE` from the specified directory. As benefits:
168         //
169         // * it has no chance to conflict.
170         // * it has a security benefit that no one (except root) can access the swap file.
171         // * it will be automatically deleted by the kernel when crosvm exits/dies or on reboot if
172         //   the device panics/hard-resets while crosvm is running.
173         let swap_file = OpenOptions::new()
174             .read(true)
175             .write(true)
176             .custom_flags(libc::O_TMPFILE | libc::O_EXCL)
177             .mode(0o000) // other processes with the same uid can't open the file
178             .open(swap_dir)?;
179         // The internal tube in which [Command]s sent from other processes than the monitor process
180         // to the monitor process. The response is `Status` only.
181         let (command_tube_main, command_tube_monitor) =
182             Tube::pair().context("create swap command tube")?;
183 
184         // Allocate eventfd before creating sandbox.
185         let bg_job_control = BackgroundJobControl::new().context("create background job event")?;
186 
187         let dead_uffd_checker = DeadUffdCheckerImpl::new().context("create dead uffd checker")?;
188 
189         let mut keep_rds = vec![
190             stdout().as_raw_descriptor(),
191             stderr().as_raw_descriptor(),
192             uffd.as_raw_descriptor(),
193             swap_file.as_raw_descriptor(),
194             command_tube_monitor.as_raw_descriptor(),
195             bg_job_control.get_completion_event().as_raw_descriptor(),
196         ];
197 
198         syslog::push_descriptors(&mut keep_rds);
199         cros_tracing::push_descriptors!(&mut keep_rds);
200         metrics::push_descriptors(&mut keep_rds);
201         keep_rds.extend(guest_memory.as_raw_descriptors());
202 
203         keep_rds.extend(uffd_factory.as_raw_descriptors());
204 
205         // Load and cache transparent hugepage size from sysfs before jumping into sandbox.
206         Lazy::force(&THP_SIZE);
207 
208         let mut jail = if let Some(jail_config) = jail_config {
209             let config = SandboxConfig::new(jail_config, "swap_monitor");
210             create_sandbox_minijail(&jail_config.pivot_root, MAX_OPEN_FILES_DEFAULT, &config)
211                 .context("create sandbox jail")?
212         } else {
213             create_base_minijail(Path::new("/"), MAX_OPEN_FILES_DEFAULT)
214                 .context("create minijail")?
215         };
216         jail.set_rlimit(
217             libc::RLIMIT_MEMLOCK as libc::c_int,
218             MLOCK_BUDGET as u64,
219             MLOCK_BUDGET as u64,
220         )
221         .context("error setting RLIMIT_MEMLOCK")?;
222 
223         // Start a page fault monitoring process (this will be the first child process of the
224         // current process)
225         let child_process =
226             fork_process(jail, keep_rds, Some(String::from("swap monitor")), || {
227                 if let Err(e) = monitor_process(
228                     command_tube_monitor,
229                     guest_memory,
230                     uffd,
231                     swap_file,
232                     bg_job_control,
233                     &dead_uffd_checker,
234                 ) {
235                     if let Some(PageHandlerError::Userfaultfd(UffdError::UffdClosed)) =
236                         e.downcast_ref::<PageHandlerError>()
237                     {
238                         // Userfaultfd can cause UffdError::UffdClosed if the main process
239                         // unexpectedly while it is swapping in. This is not a bug of swap monitor,
240                         // but the other feature on the main process.
241                         // Note that UffdError::UffdClosed from other processes than the main
242                         // process are derived from PageHandler::handle_page_fault() only and
243                         // handled in the loop of handle_vmm_swap().
244                         error!(
245                             "page_fault_handler_thread exited with userfaultfd closed error: {:#}",
246                             e
247                         );
248                     } else if e.is::<TubeError>() {
249                         // Tube can cause TubeError if the main process unexpectedly dies. This is
250                         // not a bug of swap monitor, but the other feature on the main process.
251                         // Even if the tube itself is broken and the main process is alive, the main
252                         // process catch that the swap monitor process exits unexpectedly and
253                         // terminates itself.
254                         error!("page_fault_handler_thread exited with tube error: {:#}", e);
255                     } else {
256                         panic!("page_fault_handler_thread exited with error: {:#}", e);
257                     }
258                 }
259             })
260             .context("fork monitor process")?;
261 
262         // send first status request to the monitor process and wait for the response until setup on
263         // the monitor process completes.
264         command_tube_main.send(&Command::Status)?;
265         match command_tube_main
266             .recv::<SwapStatus>()
267             .context("recv initial status")?
268             .state
269         {
270             SwapState::Ready => {
271                 // The initial state of swap status is Ready and this is a signal that the
272                 // monitoring process completes setup and is running.
273             }
274             status => {
275                 bail!("initial state is not Ready, but {:?}", status);
276             }
277         };
278 
279         Ok(Self {
280             child_process: Some(child_process),
281             uffd_factory,
282             command_tube: command_tube_main,
283             num_static_devices: 0,
284             _dead_uffd_checker: dead_uffd_checker,
285             _guest_memory: preserved_guest_memory,
286         })
287     }
288 
289     /// Enable monitoring page faults and move guest memory to staging memory.
290     ///
291     /// The pages will be swapped in from the staging memory to the guest memory on page faults
292     /// until pages are written into the swap file by [Self::swap_out()].
293     ///
294     /// This waits until enabling vmm-swap finishes on the monitor process.
295     ///
296     /// The caller must guarantee that any contents on the guest memory is not updated during
297     /// enabling vmm-swap.
298     ///
299     /// # Note
300     ///
301     /// Enabling does not write pages to the swap file. User should call [Self::swap_out()]
302     /// after a suitable time.
303     ///
304     /// Just after enabling vmm-swap, some amount of pages are swapped in as soon as guest resumes.
305     /// By splitting the enable/swap_out operation and by delaying write to the swap file operation,
306     /// it has a benefit of reducing file I/O for hot pages.
enable(&self) -> anyhow::Result<()>307     pub fn enable(&self) -> anyhow::Result<()> {
308         self.command_tube
309             .send(&Command::Enable)
310             .context("send swap enable request")?;
311 
312         let _ = self
313             .command_tube
314             .recv::<SwapStatus>()
315             .context("receive swap status")?;
316         Ok(())
317     }
318 
319     /// Trim pages in the staging memory which are needless to be written back to the swap file.
320     ///
321     /// * zero pages
322     /// * pages which are the same as the pages in the swap file.
trim(&self) -> anyhow::Result<()>323     pub fn trim(&self) -> anyhow::Result<()> {
324         self.command_tube
325             .send(&Command::Trim)
326             .context("send swap trim request")?;
327         Ok(())
328     }
329 
330     /// Swap out all the pages in the staging memory to the swap files.
331     ///
332     /// This returns as soon as it succeeds to send request to the monitor process.
333     ///
334     /// Users should call [Self::enable()] before this. See the comment of [Self::enable()] as well.
swap_out(&self) -> anyhow::Result<()>335     pub fn swap_out(&self) -> anyhow::Result<()> {
336         self.command_tube
337             .send(&Command::SwapOut)
338             .context("send swap out request")?;
339         Ok(())
340     }
341 
342     /// Swap in all the guest memory and disable monitoring page faults.
343     ///
344     /// This returns as soon as it succeeds to send request to the monitor process.
disable(&self, slow_file_cleanup: bool) -> anyhow::Result<()>345     pub fn disable(&self, slow_file_cleanup: bool) -> anyhow::Result<()> {
346         self.command_tube
347             .send(&Command::Disable { slow_file_cleanup })
348             .context("send swap disable request")?;
349         Ok(())
350     }
351 
352     /// Return current swap status.
353     ///
354     /// This blocks until response from the monitor process arrives to the main process.
status(&self) -> anyhow::Result<SwapStatus>355     pub fn status(&self) -> anyhow::Result<SwapStatus> {
356         self.command_tube
357             .send(&Command::Status)
358             .context("send swap status request")?;
359         let status = self.command_tube.recv().context("receive swap status")?;
360         Ok(status)
361     }
362 
363     /// Suspend device processes using `SIGSTOP` signal.
364     ///
365     /// When the returned `ProcessesGuard` is dropped, the devices resume.
366     ///
367     /// This must be called from the main process.
suspend_devices(&self) -> anyhow::Result<ProcessesGuard>368     pub fn suspend_devices(&self) -> anyhow::Result<ProcessesGuard> {
369         // child_process become none on dropping SwapController.
370         freeze_child_processes(
371             self.child_process
372                 .as_ref()
373                 .expect("monitor process not exist")
374                 .pid,
375         )
376     }
377 
378     /// Notify the monitor process that all static devices are forked.
379     ///
380     /// Devices forked after this call are treated as dynamic devices which can die (e.g. hotplug
381     /// devices).
on_static_devices_setup_complete(&self) -> anyhow::Result<()>382     pub fn on_static_devices_setup_complete(&self) -> anyhow::Result<()> {
383         // This sends the number of static devices counted on the main process because device
384         // initializations are executed on child processes asynchronously.
385         self.command_tube
386             .send(&Command::StaticDeviceSetupComplete(self.num_static_devices))
387             .context("send command")
388     }
389 
390     /// Create [SwapDeviceHelper].
create_device_helper(&self) -> anyhow::Result<SwapDeviceHelper>391     pub fn create_device_helper(&self) -> anyhow::Result<SwapDeviceHelper> {
392         let uffd_factory = self
393             .uffd_factory
394             .try_clone()
395             .context("try clone uffd factory")?;
396         let command_tube = self
397             .command_tube
398             .try_clone_send_tube()
399             .context("try clone tube")?;
400         Ok(SwapDeviceHelper {
401             uffd_factory,
402             command_tube,
403         })
404     }
405 }
406 
407 impl Drop for SwapController {
drop(&mut self)408     fn drop(&mut self) {
409         // Shutdown the monitor process.
410         // This blocks until the monitor process exits.
411         if let Err(e) = self.command_tube.send(&Command::Exit) {
412             error!(
413                 "failed to sent exit command to vmm-swap monitor process: {:#}",
414                 e
415             );
416             return;
417         }
418         if let Err(e) = self
419             .child_process
420             .take()
421             .expect("monitor process not exist")
422             .wait()
423         {
424             error!("failed to wait vmm-swap monitor process shutdown: {:#}", e);
425         }
426     }
427 }
428 
429 /// Create a new [SwapDeviceUffdSender] which is passed to the forked child process.
430 pub trait PrepareFork {
431     /// Create a new [SwapDeviceUffdSender].
prepare_fork(&mut self) -> anyhow::Result<SwapDeviceUffdSender>432     fn prepare_fork(&mut self) -> anyhow::Result<SwapDeviceUffdSender>;
433 }
434 
435 impl PrepareFork for SwapController {
436     /// Create a new [SwapDeviceUffdSender].
437     ///
438     /// This should be called from the main process because creating a [Tube]s requires seccomp
439     /// policy.
440     ///
441     /// This also counts the number of static devices which are created before booting.
prepare_fork(&mut self) -> anyhow::Result<SwapDeviceUffdSender>442     fn prepare_fork(&mut self) -> anyhow::Result<SwapDeviceUffdSender> {
443         let command_tube = self
444             .command_tube
445             .try_clone_send_tube()
446             .context("try clone tube")?;
447         self.num_static_devices += 1;
448         SwapDeviceUffdSender::new(command_tube, &self.uffd_factory)
449     }
450 }
451 
452 /// Helper to create [SwapDeviceUffdSender] from child processes (e.g. JailWarden for hotplug
453 /// devices).
454 pub struct SwapDeviceHelper {
455     uffd_factory: UffdFactory,
456     command_tube: SendTube,
457 }
458 
459 impl PrepareFork for SwapDeviceHelper {
460     /// Create a new [SwapDeviceUffdSender].
prepare_fork(&mut self) -> anyhow::Result<SwapDeviceUffdSender>461     fn prepare_fork(&mut self) -> anyhow::Result<SwapDeviceUffdSender> {
462         let command_tube = self.command_tube.try_clone().context("try clone tube")?;
463         SwapDeviceUffdSender::new(command_tube, &self.uffd_factory)
464     }
465 }
466 
467 impl AsRawDescriptors for SwapDeviceHelper {
as_raw_descriptors(&self) -> Vec<RawDescriptor>468     fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
469         let mut rds = self.uffd_factory.as_raw_descriptors();
470         rds.push(self.command_tube.as_raw_descriptor());
471         rds
472     }
473 }
474 
475 /// Create a new userfaultfd and send it to the monitor process.
476 pub struct SwapDeviceUffdSender {
477     uffd_factory: UffdFactory,
478     command_tube: SendTube,
479     sender: Tube,
480     receiver: Tube,
481 }
482 
483 impl SwapDeviceUffdSender {
new(command_tube: SendTube, uffd_factory: &UffdFactory) -> anyhow::Result<Self>484     fn new(command_tube: SendTube, uffd_factory: &UffdFactory) -> anyhow::Result<Self> {
485         let uffd_factory = uffd_factory.try_clone().context("try clone uffd factory")?;
486         let (sender, receiver) = Tube::pair().context("create tube")?;
487         receiver
488             .set_recv_timeout(Some(Duration::from_secs(60)))
489             .context("set recv timeout")?;
490         Ok(SwapDeviceUffdSender {
491             uffd_factory,
492             command_tube,
493             sender,
494             receiver,
495         })
496     }
497 
498     /// Create a new userfaultfd and send it to the monitor process.
499     ///
500     /// This must be called as soon as a child process which may touch the guest memory is forked.
501     ///
502     /// Userfaultfd(2) originally has `UFFD_FEATURE_EVENT_FORK`. But it is not applicable to crosvm
503     /// since it does not support non-root user namespace.
on_process_forked(self) -> anyhow::Result<()>504     pub fn on_process_forked(self) -> anyhow::Result<()> {
505         let uffd = self.uffd_factory.create().context("create userfaultfd")?;
506         // The fd for Userfaultfd in this process is dropped when it is sent via Tube, but the
507         // userfaultfd keeps alive in the monitor process which it is sent to.
508         self.command_tube
509             .send(&Command::ProcessForked {
510                 uffd,
511                 reply_tube: self.sender,
512             })
513             .context("send forked event")?;
514         // Wait to proceeds the child process logic until the userfaultfd is set up.
515         if !self.receiver.recv::<bool>().context("recv tube")? {
516             bail!("failed to register a new userfaultfd");
517         }
518         Ok(())
519     }
520 }
521 
522 impl AsRawDescriptors for SwapDeviceUffdSender {
as_raw_descriptors(&self) -> Vec<RawDescriptor>523     fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
524         let mut rds = self.uffd_factory.as_raw_descriptors();
525         rds.push(self.command_tube.as_raw_descriptor());
526         rds.push(self.sender.as_raw_descriptor());
527         rds.push(self.receiver.as_raw_descriptor());
528         rds
529     }
530 }
531 
532 #[derive(EventToken, Clone, Copy)]
533 enum Token {
534     UffdEvents(u32),
535     Command,
536     BackgroundJobCompleted,
537 }
538 
539 impl UffdListToken for Token {
uffd_token(idx: u32) -> Self540     fn uffd_token(idx: u32) -> Self {
541         Token::UffdEvents(idx)
542     }
543 }
544 
regions_from_guest_memory(guest_memory: &GuestMemory) -> Vec<Range<usize>>545 fn regions_from_guest_memory(guest_memory: &GuestMemory) -> Vec<Range<usize>> {
546     guest_memory
547         .regions()
548         .map(|region| region.host_addr..(region.host_addr + region.size))
549         .collect()
550 }
551 
552 /// The main thread of the monitor process.
monitor_process( command_tube: Tube, guest_memory: GuestMemory, uffd: Userfaultfd, swap_file: File, bg_job_control: BackgroundJobControl, dead_uffd_checker: &DeadUffdCheckerImpl, ) -> anyhow::Result<()>553 fn monitor_process(
554     command_tube: Tube,
555     guest_memory: GuestMemory,
556     uffd: Userfaultfd,
557     swap_file: File,
558     bg_job_control: BackgroundJobControl,
559     dead_uffd_checker: &DeadUffdCheckerImpl,
560 ) -> anyhow::Result<()> {
561     info!("monitor_process started");
562 
563     let wait_ctx = WaitContext::build_with(&[
564         (&command_tube, Token::Command),
565         (
566             bg_job_control.get_completion_event(),
567             Token::BackgroundJobCompleted,
568         ),
569     ])
570     .context("create wait context")?;
571 
572     let mut swap_file_opt = Some(swap_file);
573     let mut truncate_worker: Option<FileTruncator> = None;
574 
575     let n_worker = num_cpus::get();
576     info!("start {} workers for staging memory move", n_worker);
577     // The worker threads are killed when the main thread of the monitor process dies.
578     let worker = Worker::new(n_worker, n_worker);
579 
580     let mut uffd_list =
581         UffdList::new(uffd, dead_uffd_checker, &wait_ctx).context("create uffd list")?;
582     let mut state_transition = SwapStateTransition::default();
583     let mut try_gc_uffds = false;
584 
585     loop {
586         let events = wait_ctx.wait().context("wait poll events")?;
587 
588         for event in events.iter() {
589             match event.token {
590                 Token::UffdEvents(id_uffd) => {
591                     let uffd = uffd_list
592                         .get(id_uffd)
593                         .with_context(|| format!("uffd is not found for idx: {}", id_uffd))?;
594                     // Userfaultfd does not work as level triggered but as edge triggered. We need
595                     // to read all the events in the userfaultfd here.
596                     while let Some(event) = uffd.read_event().context("read userfaultfd event")? {
597                         match event {
598                             UffdEvent::Remove { .. } => {
599                                 // BUG(b/272620051): This is a bug of userfaultfd that
600                                 // UFFD_EVENT_REMOVE can be read even after unregistering memory
601                                 // from the userfaultfd.
602                                 warn!("page remove event while vmm-swap disabled");
603                             }
604                             event => {
605                                 bail!("unexpected uffd event: {:?}", event);
606                             }
607                         }
608                     }
609                 }
610                 Token::Command => match command_tube
611                     .recv::<Command>()
612                     .context("recv swap command")?
613                 {
614                     Command::ProcessForked { uffd, reply_tube } => {
615                         debug!("new fork uffd: {:?}", uffd);
616                         let result = match uffd_list.register(uffd) {
617                             Ok(is_dynamic_uffd) => {
618                                 try_gc_uffds = is_dynamic_uffd;
619                                 true
620                             }
621                             Err(e) => {
622                                 error!("failed to register uffd to list: {:?}", e);
623                                 false
624                             }
625                         };
626                         if let Err(e) = reply_tube.send(&result) {
627                             error!("failed to response to new process: {:?}", e);
628                         }
629                     }
630                     Command::StaticDeviceSetupComplete(num_static_devices) => {
631                         info!("static device setup complete: n={}", num_static_devices);
632                         if !uffd_list.set_num_static_devices(num_static_devices) {
633                             bail!("failed to set num_static_devices");
634                         }
635                     }
636                     Command::Enable => {
637                         info!("enabling vmm-swap");
638 
639                         let staging_shmem =
640                             SharedMemory::new("swap staging memory", guest_memory.memory_size())
641                                 .context("create staging shmem")?;
642 
643                         let regions = regions_from_guest_memory(&guest_memory);
644 
645                         let swap_file = match (swap_file_opt.take(), truncate_worker.take()) {
646                             (Some(file), None) => file,
647                             (None, Some(worker)) => {
648                                 worker.take_file().context("failed to get truncated swap")?
649                             }
650                             _ => bail!("Missing swap file"),
651                         };
652 
653                         let page_handler = match PageHandler::create(
654                             &swap_file,
655                             &staging_shmem,
656                             &regions,
657                             worker.channel.clone(),
658                         ) {
659                             Ok(page_handler) => page_handler,
660                             Err(e) => {
661                                 error!("failed to create swap handler: {:?}", e);
662                                 continue;
663                             }
664                         };
665 
666                         // TODO(b/272634283): Should just disable vmm-swap without crash.
667                         // SAFETY:
668                         // Safe because the regions are from guest memory and uffd_list contains all
669                         // the processes of crosvm.
670                         unsafe { register_regions(&regions, uffd_list.get_list()) }
671                             .context("register regions")?;
672 
673                         // events may contain unprocessed entries, but those pending events will be
674                         // immediately re-created when handle_vmm_swap checks wait_ctx because
675                         // WaitContext is level triggered.
676                         drop(events);
677 
678                         let mutex_transition = Mutex::new(state_transition);
679 
680                         bg_job_control.reset()?;
681                         let swap_result = std::thread::scope(|scope| {
682                             let result = handle_vmm_swap(
683                                 scope,
684                                 &wait_ctx,
685                                 &page_handler,
686                                 &mut uffd_list,
687                                 &guest_memory,
688                                 &regions,
689                                 &command_tube,
690                                 &worker,
691                                 &mutex_transition,
692                                 &bg_job_control,
693                             );
694                             // Abort background jobs to unblock ScopedJoinHandle eariler on a
695                             // failure.
696                             bg_job_control.abort();
697                             result
698                         })?;
699                         if swap_result.should_exit {
700                             return Ok(());
701                         }
702                         state_transition = mutex_transition.into_inner();
703 
704                         unregister_regions(&regions, uffd_list.get_list())
705                             .context("unregister regions")?;
706 
707                         // Truncate the swap file to hold minimum resources while disabled.
708                         if swap_result.slow_file_cleanup {
709                             truncate_worker = Some(
710                                 FileTruncator::new(swap_file)
711                                     .context("failed to start truncating")?,
712                             );
713                         } else {
714                             if let Err(e) = swap_file.set_len(0) {
715                                 error!("failed to clear swap file: {:?}", e);
716                             };
717                             swap_file_opt = Some(swap_file);
718                         }
719 
720                         info!("vmm-swap is disabled");
721                         // events are obsolete. Run `WaitContext::wait()` again
722                         break;
723                     }
724                     Command::Trim => {
725                         warn!("swap trim while disabled");
726                     }
727                     Command::SwapOut => {
728                         warn!("swap out while disabled");
729                     }
730                     Command::Disable { slow_file_cleanup } => {
731                         if !slow_file_cleanup {
732                             if let Some(worker) = truncate_worker.take() {
733                                 swap_file_opt =
734                                     Some(worker.take_file().context("failed to truncate swap")?);
735                             }
736                         }
737                     }
738                     Command::Exit => {
739                         return Ok(());
740                     }
741                     Command::Status => {
742                         let metrics = SwapMetrics {
743                             resident_pages: count_resident_pages(&guest_memory) as u64,
744                             ..Default::default()
745                         };
746                         let status = SwapStatus {
747                             state: SwapState::Ready,
748                             metrics,
749                             state_transition,
750                         };
751                         command_tube.send(&status).context("send status response")?;
752                         debug!("swap status: {:?}", status);
753                     }
754                 },
755                 Token::BackgroundJobCompleted => {
756                     error!("unexpected background job completed event while swap is disabled");
757                     bg_job_control.reset()?;
758                 }
759             };
760         }
761         if try_gc_uffds {
762             uffd_list.gc_dead_uffds().context("gc dead uffds")?;
763             try_gc_uffds = false;
764         }
765     }
766 }
767 
768 enum State<'scope> {
769     SwapOutPending,
770     Trim(ScopedJoinHandle<'scope, anyhow::Result<()>>),
771     SwapOutInProgress {
772         started_time: Instant,
773     },
774     SwapOutCompleted,
775     SwapInInProgress {
776         join_handle: ScopedJoinHandle<'scope, anyhow::Result<()>>,
777         slow_file_cleanup: bool,
778     },
779     Failed,
780 }
781 
782 impl From<&State<'_>> for SwapState {
from(state: &State<'_>) -> Self783     fn from(state: &State<'_>) -> Self {
784         match state {
785             State::SwapOutPending => SwapState::Pending,
786             State::Trim(_) => SwapState::TrimInProgress,
787             State::SwapOutInProgress { .. } => SwapState::SwapOutInProgress,
788             State::SwapOutCompleted => SwapState::Active,
789             State::SwapInInProgress { .. } => SwapState::SwapInInProgress,
790             State::Failed => SwapState::Failed,
791         }
792     }
793 }
794 
handle_enable_command<'scope>( state: State, bg_job_control: &BackgroundJobControl, page_handler: &PageHandler, guest_memory: &GuestMemory, worker: &Worker<MoveToStaging>, state_transition: &Mutex<SwapStateTransition>, ) -> anyhow::Result<State<'scope>>795 fn handle_enable_command<'scope>(
796     state: State,
797     bg_job_control: &BackgroundJobControl,
798     page_handler: &PageHandler,
799     guest_memory: &GuestMemory,
800     worker: &Worker<MoveToStaging>,
801     state_transition: &Mutex<SwapStateTransition>,
802 ) -> anyhow::Result<State<'scope>> {
803     match state {
804         State::SwapInInProgress { join_handle, .. } => {
805             info!("abort swap-in");
806             abort_background_job(join_handle, bg_job_control).context("abort swap-in")?;
807         }
808         State::Trim(join_handle) => {
809             info!("abort trim");
810             abort_background_job(join_handle, bg_job_control).context("abort trim")?;
811         }
812         _ => {}
813     }
814 
815     info!("start moving memory to staging");
816     match move_guest_to_staging(page_handler, guest_memory, worker) {
817         Ok(new_state_transition) => {
818             info!(
819                 "move {} pages to staging in {} ms",
820                 new_state_transition.pages, new_state_transition.time_ms
821             );
822             *state_transition.lock() = new_state_transition;
823             Ok(State::SwapOutPending)
824         }
825         Err(e) => {
826             error!("failed to move memory to staging: {}", e);
827             *state_transition.lock() = SwapStateTransition::default();
828             Ok(State::Failed)
829         }
830     }
831 }
832 
move_guest_to_staging( page_handler: &PageHandler, guest_memory: &GuestMemory, worker: &Worker<MoveToStaging>, ) -> anyhow::Result<SwapStateTransition>833 fn move_guest_to_staging(
834     page_handler: &PageHandler,
835     guest_memory: &GuestMemory,
836     worker: &Worker<MoveToStaging>,
837 ) -> anyhow::Result<SwapStateTransition> {
838     let start_time = std::time::Instant::now();
839 
840     let mut pages = 0;
841 
842     let result = guest_memory.regions().try_for_each(|region| {
843         // SAFETY:
844         // safe because:
845         // * all the regions are registered to all userfaultfd
846         // * no process access the guest memory
847         // * page fault events are handled by PageHandler
848         // * wait for all the copy completed within _processes_guard
849         pages += unsafe {
850             page_handler.move_to_staging(region.host_addr, region.shm, region.shm_offset)
851         }
852         .context("move to staging")? as u64;
853         Ok(())
854     });
855     worker.channel.wait_complete();
856 
857     match result {
858         Ok(()) => {
859             let resident_pages = count_resident_pages(guest_memory);
860             if resident_pages > 0 {
861                 error!(
862                     "active page is not zero just after swap out but {} pages",
863                     resident_pages
864                 );
865             }
866             let time_ms = start_time.elapsed().as_millis().try_into()?;
867             Ok(SwapStateTransition { pages, time_ms })
868         }
869         Err(e) => Err(e),
870     }
871 }
872 
abort_background_job<T>( join_handle: ScopedJoinHandle<'_, anyhow::Result<T>>, bg_job_control: &BackgroundJobControl, ) -> anyhow::Result<T>873 fn abort_background_job<T>(
874     join_handle: ScopedJoinHandle<'_, anyhow::Result<T>>,
875     bg_job_control: &BackgroundJobControl,
876 ) -> anyhow::Result<T> {
877     bg_job_control.abort();
878     // Wait until the background job is aborted and the thread finishes.
879     let result = join_handle
880         .join()
881         .expect("panic on the background job thread");
882     bg_job_control.reset().context("reset swap in event")?;
883     result.context("failure on background job thread")
884 }
885 
886 struct VmmSwapResult {
887     should_exit: bool,
888     slow_file_cleanup: bool,
889 }
890 
handle_vmm_swap<'scope, 'env>( scope: &'scope Scope<'scope, 'env>, wait_ctx: &WaitContext<Token>, page_handler: &'env PageHandler<'env>, uffd_list: &'env mut UffdList<Token, DeadUffdCheckerImpl>, guest_memory: &GuestMemory, regions: &[Range<usize>], command_tube: &Tube, worker: &Worker<MoveToStaging>, state_transition: &'env Mutex<SwapStateTransition>, bg_job_control: &'env BackgroundJobControl, ) -> anyhow::Result<VmmSwapResult>891 fn handle_vmm_swap<'scope, 'env>(
892     scope: &'scope Scope<'scope, 'env>,
893     wait_ctx: &WaitContext<Token>,
894     page_handler: &'env PageHandler<'env>,
895     uffd_list: &'env mut UffdList<Token, DeadUffdCheckerImpl>,
896     guest_memory: &GuestMemory,
897     regions: &[Range<usize>],
898     command_tube: &Tube,
899     worker: &Worker<MoveToStaging>,
900     state_transition: &'env Mutex<SwapStateTransition>,
901     bg_job_control: &'env BackgroundJobControl,
902 ) -> anyhow::Result<VmmSwapResult> {
903     let mut state = match move_guest_to_staging(page_handler, guest_memory, worker) {
904         Ok(transition) => {
905             info!(
906                 "move {} pages to staging in {} ms",
907                 transition.pages, transition.time_ms
908             );
909             *state_transition.lock() = transition;
910             State::SwapOutPending
911         }
912         Err(e) => {
913             error!("failed to move memory to staging: {}", e);
914             *state_transition.lock() = SwapStateTransition::default();
915             State::Failed
916         }
917     };
918     command_tube
919         .send(&SwapStatus::dummy())
920         .context("send enable finish signal")?;
921 
922     let mut try_gc_uffds = false;
923     loop {
924         let events = match &state {
925             State::SwapOutInProgress { started_time } => {
926                 let events = wait_ctx
927                     .wait_timeout(Duration::ZERO)
928                     .context("wait poll events")?;
929 
930                 // TODO(b/273129441): swap out on a background thread.
931                 // Proceed swap out only when there is no page fault (or other) events.
932                 if events.is_empty() {
933                     match page_handler.swap_out(MAX_SWAP_CHUNK_SIZE) {
934                         Ok(num_pages) => {
935                             let mut state_transition = state_transition.lock();
936                             state_transition.pages += num_pages as u64;
937                             state_transition.time_ms =
938                                 started_time.elapsed().as_millis().try_into()?;
939                             if num_pages == 0 {
940                                 info!(
941                                     "swap out all {} pages to file in {} ms",
942                                     state_transition.pages, state_transition.time_ms
943                                 );
944                                 state = State::SwapOutCompleted;
945                             }
946                         }
947                         Err(e) => {
948                             error!("failed to swap out: {:?}", e);
949                             state = State::Failed;
950                             *state_transition.lock() = SwapStateTransition::default();
951                         }
952                     }
953                     continue;
954                 }
955 
956                 events
957             }
958             _ => wait_ctx.wait().context("wait poll events")?,
959         };
960 
961         for event in events.iter() {
962             match event.token {
963                 Token::UffdEvents(id_uffd) => {
964                     let uffd = uffd_list
965                         .get(id_uffd)
966                         .with_context(|| format!("uffd is not found for idx: {}", id_uffd))?;
967                     // Userfaultfd does not work as level triggered but as edge triggered. We need
968                     // to read all the events in the userfaultfd here.
969                     // TODO(kawasin): Use [userfaultfd::Uffd::read_events()] for performance.
970                     while let Some(event) = uffd.read_event().context("read userfaultfd event")? {
971                         match event {
972                             UffdEvent::Pagefault { addr, .. } => {
973                                 match page_handler.handle_page_fault(uffd, addr as usize) {
974                                     Ok(()) => {}
975                                     Err(PageHandlerError::Userfaultfd(UffdError::UffdClosed)) => {
976                                         // Do nothing for the uffd. It will be garbage-collected
977                                         // when a new uffd is registered.
978                                         break;
979                                     }
980                                     Err(e) => {
981                                         bail!("failed to handle page fault: {:?}", e);
982                                     }
983                                 }
984                             }
985                             UffdEvent::Remove { start, end } => {
986                                 page_handler
987                                     .handle_page_remove(start as usize, end as usize)
988                                     .context("handle fault")?;
989                             }
990                             event => {
991                                 bail!("unsupported UffdEvent: {:?}", event);
992                             }
993                         }
994                     }
995                 }
996                 Token::Command => match command_tube
997                     .recv::<Command>()
998                     .context("recv swap command")?
999                 {
1000                     Command::ProcessForked { uffd, reply_tube } => {
1001                         debug!("new fork uffd: {:?}", uffd);
1002                         let result = if let Err(e) = {
1003                             // SAFETY: regions is generated from the guest memory
1004                             // SAFETY: the uffd is from a new process.
1005                             unsafe { register_regions(regions, std::array::from_ref(&uffd)) }
1006                         } {
1007                             error!("failed to setup uffd: {:?}", e);
1008                             false
1009                         } else {
1010                             match uffd_list.register(uffd) {
1011                                 Ok(is_dynamic_uffd) => {
1012                                     try_gc_uffds = is_dynamic_uffd;
1013                                     true
1014                                 }
1015                                 Err(e) => {
1016                                     error!("failed to register uffd to list: {:?}", e);
1017                                     false
1018                                 }
1019                             }
1020                         };
1021                         if let Err(e) = reply_tube.send(&result) {
1022                             error!("failed to response to new process: {:?}", e);
1023                         }
1024                     }
1025                     Command::StaticDeviceSetupComplete(num_static_devices) => {
1026                         info!("static device setup complete: n={}", num_static_devices);
1027                         if !uffd_list.set_num_static_devices(num_static_devices) {
1028                             bail!("failed to set num_static_devices");
1029                         }
1030                     }
1031                     Command::Enable => {
1032                         let result = handle_enable_command(
1033                             state,
1034                             bg_job_control,
1035                             page_handler,
1036                             guest_memory,
1037                             worker,
1038                             state_transition,
1039                         );
1040                         command_tube
1041                             .send(&SwapStatus::dummy())
1042                             .context("send enable finish signal")?;
1043                         state = result?;
1044                     }
1045                     Command::Trim => match &state {
1046                         State::SwapOutPending => {
1047                             *state_transition.lock() = SwapStateTransition::default();
1048                             let join_handle = scope.spawn(|| {
1049                                 let mut ctx = page_handler.start_trim();
1050                                 let job = bg_job_control.new_job();
1051                                 let start_time = std::time::Instant::now();
1052 
1053                                 while !job.is_aborted() {
1054                                     if let Some(trimmed_pages) =
1055                                         ctx.trim_pages(MAX_TRIM_PAGES).context("trim pages")?
1056                                     {
1057                                         let mut state_transition = state_transition.lock();
1058                                         state_transition.pages += trimmed_pages as u64;
1059                                         state_transition.time_ms =
1060                                             start_time.elapsed().as_millis().try_into()?;
1061                                     } else {
1062                                         // Traversed all pages.
1063                                         break;
1064                                     }
1065                                 }
1066 
1067                                 if job.is_aborted() {
1068                                     info!("trim is aborted");
1069                                 } else {
1070                                     info!(
1071                                         "trimmed {} clean pages and {} zero pages",
1072                                         ctx.trimmed_clean_pages(),
1073                                         ctx.trimmed_zero_pages()
1074                                     );
1075                                 }
1076                                 Ok(())
1077                             });
1078 
1079                             state = State::Trim(join_handle);
1080                             info!("start trimming staging memory");
1081                         }
1082                         state => {
1083                             warn!(
1084                                 "swap trim is not ready. state: {:?}",
1085                                 SwapState::from(state)
1086                             );
1087                         }
1088                     },
1089                     Command::SwapOut => match &state {
1090                         State::SwapOutPending => {
1091                             state = State::SwapOutInProgress {
1092                                 started_time: std::time::Instant::now(),
1093                             };
1094                             *state_transition.lock() = SwapStateTransition::default();
1095                             info!("start swapping out");
1096                         }
1097                         state => {
1098                             warn!("swap out is not ready. state: {:?}", SwapState::from(state));
1099                         }
1100                     },
1101                     Command::Disable { slow_file_cleanup } => {
1102                         match state {
1103                             State::Trim(join_handle) => {
1104                                 info!("abort trim");
1105                                 abort_background_job(join_handle, bg_job_control)
1106                                     .context("abort trim")?;
1107                             }
1108                             State::SwapOutInProgress { .. } => {
1109                                 info!("swap out is aborted");
1110                             }
1111                             State::SwapInInProgress { join_handle, .. } => {
1112                                 info!("swap in is in progress");
1113                                 state = State::SwapInInProgress {
1114                                     join_handle,
1115                                     slow_file_cleanup,
1116                                 };
1117                                 continue;
1118                             }
1119                             _ => {}
1120                         }
1121                         *state_transition.lock() = SwapStateTransition::default();
1122 
1123                         let uffd = uffd_list.clone_main_uffd().context("clone main uffd")?;
1124                         let join_handle = scope.spawn(move || {
1125                             let mut ctx = page_handler.start_swap_in();
1126                             let job = bg_job_control.new_job();
1127                             let start_time = std::time::Instant::now();
1128                             while !job.is_aborted() {
1129                                 match ctx.swap_in(&uffd, MAX_SWAP_CHUNK_SIZE) {
1130                                     Ok(num_pages) => {
1131                                         if num_pages == 0 {
1132                                             break;
1133                                         }
1134                                         let mut state_transition = state_transition.lock();
1135                                         state_transition.pages += num_pages as u64;
1136                                         state_transition.time_ms =
1137                                             start_time.elapsed().as_millis().try_into()?;
1138                                     }
1139                                     Err(e) => {
1140                                         bail!("failed to swap in: {:?}", e);
1141                                     }
1142                                 }
1143                             }
1144                             if job.is_aborted() {
1145                                 info!("swap in is aborted");
1146                             }
1147                             Ok(())
1148                         });
1149                         state = State::SwapInInProgress {
1150                             join_handle,
1151                             slow_file_cleanup,
1152                         };
1153 
1154                         info!("start swapping in");
1155                     }
1156                     Command::Exit => {
1157                         match state {
1158                             State::SwapInInProgress { join_handle, .. } => {
1159                                 // Wait until swap-in finishes.
1160                                 if let Err(e) = join_handle.join() {
1161                                     bail!("failed to join swap in thread: {:?}", e);
1162                                 }
1163                                 return Ok(VmmSwapResult {
1164                                     should_exit: true,
1165                                     slow_file_cleanup: false,
1166                                 });
1167                             }
1168                             State::Trim(join_handle) => {
1169                                 abort_background_job(join_handle, bg_job_control)
1170                                     .context("abort trim")?;
1171                             }
1172                             _ => {}
1173                         }
1174                         let mut ctx = page_handler.start_swap_in();
1175                         // Swap-in all before exit.
1176                         while ctx
1177                             .swap_in(uffd_list.main_uffd(), MAX_SWAP_CHUNK_SIZE)
1178                             .context("swap in")?
1179                             > 0
1180                         {}
1181                         return Ok(VmmSwapResult {
1182                             should_exit: true,
1183                             slow_file_cleanup: false,
1184                         });
1185                     }
1186                     Command::Status => {
1187                         let mut metrics = SwapMetrics {
1188                             resident_pages: count_resident_pages(guest_memory) as u64,
1189                             ..Default::default()
1190                         };
1191                         page_handler.load_metrics(&mut metrics);
1192                         let status = SwapStatus {
1193                             state: (&state).into(),
1194                             metrics,
1195                             state_transition: *state_transition.lock(),
1196                         };
1197                         command_tube.send(&status).context("send status response")?;
1198                         debug!("swap status: {:?}", status);
1199                     }
1200                 },
1201                 Token::BackgroundJobCompleted => {
1202                     // Reset the completed event.
1203                     if !bg_job_control
1204                         .reset()
1205                         .context("reset background job event")?
1206                     {
1207                         // When the job is aborted and the event is comsumed by reset(), the token
1208                         // `Token::BackgroundJobCompleted` may remain in the `events`. Just ignore
1209                         // the obsolete token here.
1210                         continue;
1211                     }
1212                     match state {
1213                         State::SwapInInProgress {
1214                             join_handle,
1215                             slow_file_cleanup,
1216                         } => {
1217                             join_handle
1218                                 .join()
1219                                 .expect("panic on the background job thread")
1220                                 .context("swap in finish")?;
1221                             let state_transition = state_transition.lock();
1222                             info!(
1223                                 "swap in all {} pages in {} ms.",
1224                                 state_transition.pages, state_transition.time_ms
1225                             );
1226                             return Ok(VmmSwapResult {
1227                                 should_exit: false,
1228                                 slow_file_cleanup,
1229                             });
1230                         }
1231                         State::Trim(join_handle) => {
1232                             join_handle
1233                                 .join()
1234                                 .expect("panic on the background job thread")
1235                                 .context("trim finish")?;
1236                             let state_transition = state_transition.lock();
1237                             info!(
1238                                 "trimmed {} pages in {} ms.",
1239                                 state_transition.pages, state_transition.time_ms
1240                             );
1241                             state = State::SwapOutPending;
1242                         }
1243                         state => {
1244                             bail!(
1245                                 "background job completed but the actual state is {:?}",
1246                                 SwapState::from(&state)
1247                             );
1248                         }
1249                     }
1250                 }
1251             };
1252         }
1253         if try_gc_uffds {
1254             uffd_list.gc_dead_uffds().context("gc dead uffds")?;
1255             try_gc_uffds = false;
1256         }
1257     }
1258 }
1259