1 // Copyright 2020 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::io; 6 use std::os::fd::AsRawFd; 7 use std::sync::Arc; 8 9 use base::sys::fallocate; 10 use base::sys::FallocateMode; 11 use base::AsRawDescriptor; 12 use base::VolatileSlice; 13 use remain::sorted; 14 use thiserror::Error as ThisError; 15 16 use super::fd_executor; 17 use super::fd_executor::EpollReactor; 18 use super::fd_executor::RegisteredSource; 19 use crate::common_executor::RawExecutor; 20 use crate::mem::BackingMemory; 21 use crate::AsyncError; 22 use crate::AsyncResult; 23 use crate::MemRegion; 24 25 #[sorted] 26 #[derive(ThisError, Debug)] 27 pub enum Error { 28 /// An error occurred attempting to register a waker with the executor. 29 #[error("An error occurred attempting to register a waker with the executor: {0}.")] 30 AddingWaker(fd_executor::Error), 31 /// Failed to discard a block 32 #[error("Failed to discard a block: {0}")] 33 Discard(base::Error), 34 /// An executor error occurred. 35 #[error("An executor error occurred: {0}")] 36 Executor(fd_executor::Error), 37 /// An error occurred when executing fallocate synchronously. 38 #[error("An error occurred when executing fallocate synchronously: {0}")] 39 Fallocate(base::Error), 40 /// An error occurred when executing fdatasync synchronously. 41 #[error("An error occurred when executing fdatasync synchronously: {0}")] 42 Fdatasync(base::Error), 43 /// An error occurred when executing fsync synchronously. 44 #[error("An error occurred when executing fsync synchronously: {0}")] 45 Fsync(base::Error), 46 /// An error occurred when reading the FD. 47 #[error("An error occurred when reading the FD: {0}.")] 48 Read(base::Error), 49 /// Can't seek file. 50 #[error("An error occurred when seeking the FD: {0}.")] 51 Seeking(base::Error), 52 /// An error occurred when writing the FD. 53 #[error("An error occurred when writing the FD: {0}.")] 54 Write(base::Error), 55 } 56 pub type Result<T> = std::result::Result<T, Error>; 57 58 impl From<Error> for io::Error { from(e: Error) -> Self59 fn from(e: Error) -> Self { 60 use Error::*; 61 match e { 62 AddingWaker(e) => e.into(), 63 Executor(e) => e.into(), 64 Discard(e) => e.into(), 65 Fallocate(e) => e.into(), 66 Fdatasync(e) => e.into(), 67 Fsync(e) => e.into(), 68 Read(e) => e.into(), 69 Seeking(e) => e.into(), 70 Write(e) => e.into(), 71 } 72 } 73 } 74 75 impl From<Error> for AsyncError { from(e: Error) -> AsyncError76 fn from(e: Error) -> AsyncError { 77 AsyncError::SysVariants(e.into()) 78 } 79 } 80 81 /// Async wrapper for an IO source that uses the FD executor to drive async operations. 82 pub struct PollSource<F> { 83 registered_source: RegisteredSource<F>, 84 } 85 86 impl<F: AsRawDescriptor> PollSource<F> { 87 /// Create a new `PollSource` from the given IO source. new(f: F, ex: &Arc<RawExecutor<EpollReactor>>) -> Result<Self>88 pub fn new(f: F, ex: &Arc<RawExecutor<EpollReactor>>) -> Result<Self> { 89 RegisteredSource::new(ex, f) 90 .map({ 91 |f| PollSource { 92 registered_source: f, 93 } 94 }) 95 .map_err(Error::Executor) 96 } 97 } 98 99 impl<F: AsRawDescriptor> PollSource<F> { 100 /// Reads from the iosource at `file_offset` and fill the given `vec`. read_to_vec( &self, file_offset: Option<u64>, mut vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>101 pub async fn read_to_vec( 102 &self, 103 file_offset: Option<u64>, 104 mut vec: Vec<u8>, 105 ) -> AsyncResult<(usize, Vec<u8>)> { 106 loop { 107 let res = if let Some(offset) = file_offset { 108 // SAFETY: 109 // Safe because this will only modify `vec` and we check the return value. 110 unsafe { 111 libc::pread64( 112 self.registered_source.duped_fd.as_raw_fd(), 113 vec.as_mut_ptr() as *mut libc::c_void, 114 vec.len(), 115 offset as libc::off64_t, 116 ) 117 } 118 } else { 119 // SAFETY: 120 // Safe because this will only modify `vec` and we check the return value. 121 unsafe { 122 libc::read( 123 self.registered_source.duped_fd.as_raw_fd(), 124 vec.as_mut_ptr() as *mut libc::c_void, 125 vec.len(), 126 ) 127 } 128 }; 129 130 if res >= 0 { 131 return Ok((res as usize, vec)); 132 } 133 134 match base::Error::last() { 135 e if e.errno() == libc::EWOULDBLOCK => { 136 let op = self 137 .registered_source 138 .wait_readable() 139 .map_err(Error::AddingWaker)?; 140 op.await.map_err(Error::Executor)?; 141 } 142 e => return Err(Error::Read(e).into()), 143 } 144 } 145 } 146 147 /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>148 pub async fn read_to_mem( 149 &self, 150 file_offset: Option<u64>, 151 mem: Arc<dyn BackingMemory + Send + Sync>, 152 mem_offsets: impl IntoIterator<Item = MemRegion>, 153 ) -> AsyncResult<usize> { 154 let mut iovecs = mem_offsets 155 .into_iter() 156 .filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok()) 157 .collect::<Vec<VolatileSlice>>(); 158 159 loop { 160 let res = if let Some(offset) = file_offset { 161 // SAFETY: 162 // Safe because we trust the kernel not to write path the length given and the 163 // length is guaranteed to be valid from the pointer by 164 // io_slice_mut. 165 unsafe { 166 libc::preadv64( 167 self.registered_source.duped_fd.as_raw_fd(), 168 iovecs.as_mut_ptr() as *mut _, 169 iovecs.len() as i32, 170 offset as libc::off64_t, 171 ) 172 } 173 } else { 174 // SAFETY: 175 // Safe because we trust the kernel not to write path the length given and the 176 // length is guaranteed to be valid from the pointer by 177 // io_slice_mut. 178 unsafe { 179 libc::readv( 180 self.registered_source.duped_fd.as_raw_fd(), 181 iovecs.as_mut_ptr() as *mut _, 182 iovecs.len() as i32, 183 ) 184 } 185 }; 186 187 if res >= 0 { 188 return Ok(res as usize); 189 } 190 191 match base::Error::last() { 192 e if e.errno() == libc::EWOULDBLOCK => { 193 let op = self 194 .registered_source 195 .wait_readable() 196 .map_err(Error::AddingWaker)?; 197 op.await.map_err(Error::Executor)?; 198 } 199 e => return Err(Error::Read(e).into()), 200 } 201 } 202 } 203 204 /// Wait for the FD of `self` to be readable. wait_readable(&self) -> AsyncResult<()>205 pub async fn wait_readable(&self) -> AsyncResult<()> { 206 let op = self 207 .registered_source 208 .wait_readable() 209 .map_err(Error::AddingWaker)?; 210 op.await.map_err(Error::Executor)?; 211 Ok(()) 212 } 213 214 /// Writes from the given `vec` to the file starting at `file_offset`. write_from_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>215 pub async fn write_from_vec( 216 &self, 217 file_offset: Option<u64>, 218 vec: Vec<u8>, 219 ) -> AsyncResult<(usize, Vec<u8>)> { 220 loop { 221 let res = if let Some(offset) = file_offset { 222 // SAFETY: 223 // Safe because this will not modify any memory and we check the return value. 224 unsafe { 225 libc::pwrite64( 226 self.registered_source.duped_fd.as_raw_fd(), 227 vec.as_ptr() as *const libc::c_void, 228 vec.len(), 229 offset as libc::off64_t, 230 ) 231 } 232 } else { 233 // SAFETY: 234 // Safe because this will not modify any memory and we check the return value. 235 unsafe { 236 libc::write( 237 self.registered_source.duped_fd.as_raw_fd(), 238 vec.as_ptr() as *const libc::c_void, 239 vec.len(), 240 ) 241 } 242 }; 243 244 if res >= 0 { 245 return Ok((res as usize, vec)); 246 } 247 248 match base::Error::last() { 249 e if e.errno() == libc::EWOULDBLOCK => { 250 let op = self 251 .registered_source 252 .wait_writable() 253 .map_err(Error::AddingWaker)?; 254 op.await.map_err(Error::Executor)?; 255 } 256 e => return Err(Error::Write(e).into()), 257 } 258 } 259 } 260 261 /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>262 pub async fn write_from_mem( 263 &self, 264 file_offset: Option<u64>, 265 mem: Arc<dyn BackingMemory + Send + Sync>, 266 mem_offsets: impl IntoIterator<Item = MemRegion>, 267 ) -> AsyncResult<usize> { 268 let iovecs = mem_offsets 269 .into_iter() 270 .map(|mem_range| mem.get_volatile_slice(mem_range)) 271 .filter_map(|r| r.ok()) 272 .collect::<Vec<VolatileSlice>>(); 273 274 loop { 275 let res = if let Some(offset) = file_offset { 276 // SAFETY: 277 // Safe because we trust the kernel not to write path the length given and the 278 // length is guaranteed to be valid from the pointer by 279 // io_slice_mut. 280 unsafe { 281 libc::pwritev64( 282 self.registered_source.duped_fd.as_raw_fd(), 283 iovecs.as_ptr() as *mut _, 284 iovecs.len() as i32, 285 offset as libc::off64_t, 286 ) 287 } 288 } else { 289 // SAFETY: 290 // Safe because we trust the kernel not to write path the length given and the 291 // length is guaranteed to be valid from the pointer by 292 // io_slice_mut. 293 unsafe { 294 libc::writev( 295 self.registered_source.duped_fd.as_raw_fd(), 296 iovecs.as_ptr() as *mut _, 297 iovecs.len() as i32, 298 ) 299 } 300 }; 301 302 if res >= 0 { 303 return Ok(res as usize); 304 } 305 306 match base::Error::last() { 307 e if e.errno() == libc::EWOULDBLOCK => { 308 let op = self 309 .registered_source 310 .wait_writable() 311 .map_err(Error::AddingWaker)?; 312 op.await.map_err(Error::Executor)?; 313 } 314 e => return Err(Error::Write(e).into()), 315 } 316 } 317 } 318 319 /// # Safety 320 /// 321 /// Sync all completed write operations to the backing storage. fsync(&self) -> AsyncResult<()>322 pub async fn fsync(&self) -> AsyncResult<()> { 323 // SAFETY: the duped_fd is valid and return value is checked. 324 let ret = unsafe { libc::fsync(self.registered_source.duped_fd.as_raw_fd()) }; 325 if ret == 0 { 326 Ok(()) 327 } else { 328 Err(Error::Fsync(base::Error::last()).into()) 329 } 330 } 331 332 /// punch_hole punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()>333 pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> { 334 Ok(fallocate( 335 &self.registered_source.duped_fd, 336 FallocateMode::PunchHole, 337 file_offset, 338 len, 339 ) 340 .map_err(Error::Fallocate)?) 341 } 342 343 /// write_zeroes_at write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()>344 pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> { 345 Ok(fallocate( 346 &self.registered_source.duped_fd, 347 FallocateMode::ZeroRange, 348 file_offset, 349 len, 350 ) 351 .map_err(Error::Fallocate)?) 352 } 353 354 /// Sync all data of completed write operations to the backing storage, avoiding updating extra 355 /// metadata. fdatasync(&self) -> AsyncResult<()>356 pub async fn fdatasync(&self) -> AsyncResult<()> { 357 // SAFETY: the duped_fd is valid and return value is checked. 358 let ret = unsafe { libc::fdatasync(self.registered_source.duped_fd.as_raw_fd()) }; 359 if ret == 0 { 360 Ok(()) 361 } else { 362 Err(Error::Fdatasync(base::Error::last()).into()) 363 } 364 } 365 366 /// Yields the underlying IO source. into_source(self) -> F367 pub fn into_source(self) -> F { 368 self.registered_source.source 369 } 370 371 /// Provides a mutable ref to the underlying IO source. as_source_mut(&mut self) -> &mut F372 pub fn as_source_mut(&mut self) -> &mut F { 373 &mut self.registered_source.source 374 } 375 376 /// Provides a ref to the underlying IO source. as_source(&self) -> &F377 pub fn as_source(&self) -> &F { 378 &self.registered_source.source 379 } 380 } 381 382 // NOTE: Prefer adding tests to io_source.rs if not backend specific. 383 #[cfg(test)] 384 mod tests { 385 use std::fs::File; 386 387 use super::*; 388 use crate::ExecutorTrait; 389 390 #[test] memory_leak()391 fn memory_leak() { 392 // This test needs to run under ASAN to detect memory leaks. 393 394 async fn owns_poll_source(source: PollSource<File>) { 395 let _ = source.wait_readable().await; 396 } 397 398 let (rx, _tx) = base::pipe().unwrap(); 399 let ex = RawExecutor::<EpollReactor>::new().unwrap(); 400 let source = PollSource::new(rx, &ex).unwrap(); 401 ex.spawn_local(owns_poll_source(source)).detach(); 402 403 // Drop `ex` without running. This would cause a memory leak if PollSource owned a strong 404 // reference to the executor because it owns a reference to the future that owns PollSource 405 // (via its Runnable). The strong reference prevents the drop impl from running, which would 406 // otherwise poll the future and have it return with an error. 407 } 408 } 409