// Copyright 2021 The ChromiumOS Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. use std::os::unix::prelude::AsRawFd; use std::os::unix::prelude::RawFd; use std::time::Duration; use serde::de::DeserializeOwned; use serde::Deserialize; use serde::Serialize; use crate::descriptor::AsRawDescriptor; use crate::descriptor_reflection::deserialize_with_descriptors; use crate::descriptor_reflection::SerializeDescriptors; use crate::handle_eintr; use crate::tube::Error; use crate::tube::RecvTube; use crate::tube::Result; use crate::tube::SendTube; use crate::BlockingMode; use crate::FramingMode; use crate::RawDescriptor; use crate::ReadNotifier; use crate::ScmSocket; use crate::StreamChannel; use crate::UnixSeqpacket; use crate::SCM_SOCKET_MAX_FD_COUNT; // This size matches the inline buffer size of CmsgBuffer. const TUBE_MAX_FDS: usize = 32; /// Bidirectional tube that support both send and recv. #[derive(Serialize, Deserialize)] pub struct Tube { socket: ScmSocket, } impl Tube { /// Create a pair of connected tubes. Request is sent in one direction while response is in the /// other direction. pub fn pair() -> Result<(Tube, Tube)> { let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message) .map_err(|errno| Error::Pair(std::io::Error::from(errno)))?; let tube1 = Tube::new(socket1)?; let tube2 = Tube::new(socket2)?; Ok((tube1, tube2)) } /// Create a new `Tube` from a `StreamChannel`. /// The StreamChannel must use FramingMode::Message (meaning, must use a SOCK_SEQPACKET as the /// underlying socket type), otherwise, this method returns an error. pub fn new(socket: StreamChannel) -> Result { match socket.get_framing_mode() { FramingMode::Message => Ok(Tube { socket: socket.try_into().map_err(Error::DupDescriptor)?, }), FramingMode::Byte => Err(Error::InvalidFramingMode), } } /// Create a new `Tube` from a UnixSeqpacket. The StreamChannel is implicitly constructed to /// have the right FramingMode by being constructed from a UnixSeqpacket. pub fn new_from_unix_seqpacket(sock: UnixSeqpacket) -> Result { Ok(Tube { socket: StreamChannel::from_unix_seqpacket(sock) .try_into() .map_err(Error::DupDescriptor)?, }) } /// DO NOT USE this method directly as it will become private soon (b/221484449). Use a /// directional Tube pair instead. #[deprecated] pub fn try_clone(&self) -> Result { self.socket .inner() .try_clone() .map(Tube::new) .map_err(Error::Clone)? } /// Sends a message via a Tube. /// The number of file descriptors that this method can send is limited to `TUBE_MAX_FDS`. /// If you want to send more descriptors, use `send_with_max_fds` instead. pub fn send(&self, msg: &T) -> Result<()> { self.send_with_max_fds(msg, TUBE_MAX_FDS) } /// Sends a message with at most `max_fds` file descriptors via a Tube. /// Note that `max_fds` must not exceed `SCM_SOCKET_MAX_FD_COUNT` (= 253). pub fn send_with_max_fds(&self, msg: &T, max_fds: usize) -> Result<()> { if max_fds > SCM_SOCKET_MAX_FD_COUNT { return Err(Error::SendTooManyFds); } let msg_serialize = SerializeDescriptors::new(&msg); let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?; let msg_descriptors = msg_serialize.into_descriptors(); if msg_descriptors.len() > max_fds { return Err(Error::SendTooManyFds); } handle_eintr!(self.socket.send_with_fds(&msg_json, &msg_descriptors)) .map_err(Error::Send)?; Ok(()) } /// Recieves a message from a Tube. /// If the sender sent file descriptors more than TUBE_MAX_FDS with `send_with_max_fds`, use /// `recv_with_max_fds` instead. pub fn recv(&self) -> Result { self.recv_with_max_fds(TUBE_MAX_FDS) } /// Recieves a message with at most `max_fds` file descriptors from a Tube. pub fn recv_with_max_fds(&self, max_fds: usize) -> Result { if max_fds > SCM_SOCKET_MAX_FD_COUNT { return Err(Error::RecvTooManyFds); } // WARNING: The `cros_async` and `base_tokio` tube wrappers both assume that, if the tube // is readable, then a call to `Tube::recv` will not block (which ought to be true since we // use SOCK_SEQPACKET and a single recvmsg call currently). let msg_size = handle_eintr!(self.socket.inner().peek_size()).map_err(Error::Recv)?; // This buffer is the right size, as the size received in peek_size() represents the size // of only the message itself and not the file descriptors. The descriptors are stored // separately in msghdr::msg_control. let mut msg_json = vec![0u8; msg_size]; let (msg_json_size, msg_descriptors) = handle_eintr!(self.socket.recv_with_fds(&mut msg_json, max_fds)) .map_err(Error::Recv)?; if msg_json_size == 0 { return Err(Error::Disconnected); } deserialize_with_descriptors( || serde_json::from_slice(&msg_json[0..msg_json_size]), msg_descriptors, ) .map_err(Error::Json) } pub fn set_send_timeout(&self, timeout: Option) -> Result<()> { self.socket .inner() .set_write_timeout(timeout) .map_err(Error::SetSendTimeout) } pub fn set_recv_timeout(&self, timeout: Option) -> Result<()> { self.socket .inner() .set_read_timeout(timeout) .map_err(Error::SetRecvTimeout) } #[cfg(feature = "proto_tube")] fn send_proto(&self, msg: &M) -> Result<()> { let bytes = msg.write_to_bytes().map_err(Error::Proto)?; let no_fds: [RawFd; 0] = []; handle_eintr!(self.socket.send_with_fds(&bytes, &no_fds)).map_err(Error::Send)?; Ok(()) } #[cfg(feature = "proto_tube")] fn recv_proto(&self) -> Result { let msg_size = handle_eintr!(self.socket.inner().peek_size()).map_err(Error::Recv)?; let mut msg_bytes = vec![0u8; msg_size]; let (msg_bytes_size, _) = handle_eintr!(self.socket.recv_with_fds(&mut msg_bytes, TUBE_MAX_FDS)) .map_err(Error::Recv)?; if msg_bytes_size == 0 { return Err(Error::Disconnected); } protobuf::Message::parse_from_bytes(&msg_bytes).map_err(Error::Proto) } } impl AsRawDescriptor for Tube { fn as_raw_descriptor(&self) -> RawDescriptor { self.socket.as_raw_descriptor() } } impl AsRawFd for Tube { fn as_raw_fd(&self) -> RawFd { self.socket.inner().as_raw_fd() } } impl ReadNotifier for Tube { fn get_read_notifier(&self) -> &dyn AsRawDescriptor { &self.socket } } impl AsRawDescriptor for SendTube { fn as_raw_descriptor(&self) -> RawDescriptor { self.0.as_raw_descriptor() } } impl AsRawDescriptor for RecvTube { fn as_raw_descriptor(&self) -> RawDescriptor { self.0.as_raw_descriptor() } } /// Wrapper for Tube used for sending and receiving protos - avoids extra overhead of serialization /// via serde_json. Since protos should be standalone objects we do not support sending of file /// descriptors as a normal Tube would. #[cfg(feature = "proto_tube")] pub struct ProtoTube(Tube); #[cfg(feature = "proto_tube")] impl ProtoTube { pub fn pair() -> Result<(ProtoTube, ProtoTube)> { Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2))) } pub fn send_proto(&self, msg: &M) -> Result<()> { self.0.send_proto(msg) } pub fn recv_proto(&self) -> Result { self.0.recv_proto() } pub fn new_from_unix_seqpacket(sock: UnixSeqpacket) -> Result { Ok(ProtoTube(Tube::new_from_unix_seqpacket(sock)?)) } } #[cfg(all(feature = "proto_tube", test))] #[allow(unused_variables)] mod tests { // not testing this proto specifically, just need an existing one to test the ProtoTube. use protos::cdisk_spec::ComponentDisk; use super::*; #[test] fn tube_serializes_and_deserializes() { let (pt1, pt2) = ProtoTube::pair().unwrap(); let proto = ComponentDisk { file_path: "/some/cool/path".to_string(), offset: 99, ..ComponentDisk::new() }; pt1.send_proto(&proto).unwrap(); let recv_proto = pt2.recv_proto().unwrap(); assert!(proto.eq(&recv_proto)); } }