xref: /aosp_15_r20/external/crosvm/base_tokio/src/sys/linux/tube.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2024 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::os::fd::AsRawFd;
6 
7 use tokio::io::unix::AsyncFd;
8 
9 /// An async version of `base::Tube`.
10 pub struct TubeTokio(AsyncFd<base::Tube>);
11 
12 impl TubeTokio {
new(tube: base::Tube) -> anyhow::Result<Self>13     pub fn new(tube: base::Tube) -> anyhow::Result<Self> {
14         base::add_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)?;
15         Ok(Self(AsyncFd::new(tube)?))
16     }
17 
into_inner(self) -> base::Tube18     pub async fn into_inner(self) -> base::Tube {
19         let tube = self.0.into_inner();
20         base::clear_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)
21             .expect("failed to clear O_NONBLOCK");
22         tube
23     }
24 
send<T: serde::Serialize + Send + 'static>( &mut self, msg: T, ) -> base::TubeResult<()>25     pub async fn send<T: serde::Serialize + Send + 'static>(
26         &mut self,
27         msg: T,
28     ) -> base::TubeResult<()> {
29         loop {
30             let mut guard = self.0.writable().await.map_err(base::TubeError::Send)?;
31             let io_result = guard.try_io(|inner| {
32                 // Re-using the non-async send is potentially hazardous since it isn't explicitly
33                 // written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a
34                 // single write syscall, it should be OK.
35                 let r = inner.get_ref().send(&msg);
36                 // Transpose the `std::io::Error` errors outside so that `try_io` can check them
37                 // for `WouldBlock`.
38                 match r {
39                     Ok(x) => Ok(Ok(x)),
40                     Err(base::TubeError::Send(e)) => Err(e),
41                     Err(e) => Ok(Err(e)),
42                 }
43             });
44 
45             match io_result {
46                 Ok(result) => {
47                     return match result {
48                         Ok(Ok(x)) => Ok(x),
49                         Ok(Err(e)) => Err(e),
50                         Err(e) => Err(base::TubeError::Send(e)),
51                     }
52                 }
53                 Err(_would_block) => continue,
54             }
55         }
56     }
57 
recv<T: serde::de::DeserializeOwned + Send + 'static>( &mut self, ) -> base::TubeResult<T>58     pub async fn recv<T: serde::de::DeserializeOwned + Send + 'static>(
59         &mut self,
60     ) -> base::TubeResult<T> {
61         loop {
62             let mut guard = self.0.readable().await.map_err(base::TubeError::Recv)?;
63             let io_result = guard.try_io(|inner| {
64                 // Re-using the non-async recv is potentially hazardous since it isn't explicitly
65                 // written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a
66                 // single read syscall, it should be OK.
67                 let r = inner.get_ref().recv();
68                 // Transpose the `std::io::Error` errors outside so that `try_io` can check them
69                 // for `WouldBlock`.
70                 match r {
71                     Ok(x) => Ok(Ok(x)),
72                     Err(base::TubeError::Recv(e)) => Err(e),
73                     Err(e) => Ok(Err(e)),
74                 }
75             });
76 
77             match io_result {
78                 Ok(result) => {
79                     return match result {
80                         Ok(Ok(x)) => Ok(x),
81                         Ok(Err(e)) => Err(e),
82                         Err(e) => Err(base::TubeError::Recv(e)),
83                     }
84                 }
85                 Err(_would_block) => continue,
86             }
87         }
88     }
89 }
90