1 // Copyright 2024 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::os::fd::AsRawFd; 6 7 use tokio::io::unix::AsyncFd; 8 9 /// An async version of `base::Tube`. 10 pub struct TubeTokio(AsyncFd<base::Tube>); 11 12 impl TubeTokio { new(tube: base::Tube) -> anyhow::Result<Self>13 pub fn new(tube: base::Tube) -> anyhow::Result<Self> { 14 base::add_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)?; 15 Ok(Self(AsyncFd::new(tube)?)) 16 } 17 into_inner(self) -> base::Tube18 pub async fn into_inner(self) -> base::Tube { 19 let tube = self.0.into_inner(); 20 base::clear_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK) 21 .expect("failed to clear O_NONBLOCK"); 22 tube 23 } 24 send<T: serde::Serialize + Send + 'static>( &mut self, msg: T, ) -> base::TubeResult<()>25 pub async fn send<T: serde::Serialize + Send + 'static>( 26 &mut self, 27 msg: T, 28 ) -> base::TubeResult<()> { 29 loop { 30 let mut guard = self.0.writable().await.map_err(base::TubeError::Send)?; 31 let io_result = guard.try_io(|inner| { 32 // Re-using the non-async send is potentially hazardous since it isn't explicitly 33 // written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a 34 // single write syscall, it should be OK. 35 let r = inner.get_ref().send(&msg); 36 // Transpose the `std::io::Error` errors outside so that `try_io` can check them 37 // for `WouldBlock`. 38 match r { 39 Ok(x) => Ok(Ok(x)), 40 Err(base::TubeError::Send(e)) => Err(e), 41 Err(e) => Ok(Err(e)), 42 } 43 }); 44 45 match io_result { 46 Ok(result) => { 47 return match result { 48 Ok(Ok(x)) => Ok(x), 49 Ok(Err(e)) => Err(e), 50 Err(e) => Err(base::TubeError::Send(e)), 51 } 52 } 53 Err(_would_block) => continue, 54 } 55 } 56 } 57 recv<T: serde::de::DeserializeOwned + Send + 'static>( &mut self, ) -> base::TubeResult<T>58 pub async fn recv<T: serde::de::DeserializeOwned + Send + 'static>( 59 &mut self, 60 ) -> base::TubeResult<T> { 61 loop { 62 let mut guard = self.0.readable().await.map_err(base::TubeError::Recv)?; 63 let io_result = guard.try_io(|inner| { 64 // Re-using the non-async recv is potentially hazardous since it isn't explicitly 65 // written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a 66 // single read syscall, it should be OK. 67 let r = inner.get_ref().recv(); 68 // Transpose the `std::io::Error` errors outside so that `try_io` can check them 69 // for `WouldBlock`. 70 match r { 71 Ok(x) => Ok(Ok(x)), 72 Err(base::TubeError::Recv(e)) => Err(e), 73 Err(e) => Ok(Err(e)), 74 } 75 }); 76 77 match io_result { 78 Ok(result) => { 79 return match result { 80 Ok(Ok(x)) => Ok(x), 81 Ok(Err(e)) => Err(e), 82 Err(e) => Err(base::TubeError::Recv(e)), 83 } 84 } 85 Err(_would_block) => continue, 86 } 87 } 88 } 89 } 90