xref: /aosp_15_r20/external/crosvm/cros_async/src/sys/linux/poll_source.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io;
6 use std::os::fd::AsRawFd;
7 use std::sync::Arc;
8 
9 use base::sys::fallocate;
10 use base::sys::FallocateMode;
11 use base::AsRawDescriptor;
12 use base::VolatileSlice;
13 use remain::sorted;
14 use thiserror::Error as ThisError;
15 
16 use super::fd_executor;
17 use super::fd_executor::EpollReactor;
18 use super::fd_executor::RegisteredSource;
19 use crate::common_executor::RawExecutor;
20 use crate::mem::BackingMemory;
21 use crate::AsyncError;
22 use crate::AsyncResult;
23 use crate::MemRegion;
24 
25 #[sorted]
26 #[derive(ThisError, Debug)]
27 pub enum Error {
28     /// An error occurred attempting to register a waker with the executor.
29     #[error("An error occurred attempting to register a waker with the executor: {0}.")]
30     AddingWaker(fd_executor::Error),
31     /// Failed to discard a block
32     #[error("Failed to discard a block: {0}")]
33     Discard(base::Error),
34     /// An executor error occurred.
35     #[error("An executor error occurred: {0}")]
36     Executor(fd_executor::Error),
37     /// An error occurred when executing fallocate synchronously.
38     #[error("An error occurred when executing fallocate synchronously: {0}")]
39     Fallocate(base::Error),
40     /// An error occurred when executing fdatasync synchronously.
41     #[error("An error occurred when executing fdatasync synchronously: {0}")]
42     Fdatasync(base::Error),
43     /// An error occurred when executing fsync synchronously.
44     #[error("An error occurred when executing fsync synchronously: {0}")]
45     Fsync(base::Error),
46     /// An error occurred when reading the FD.
47     #[error("An error occurred when reading the FD: {0}.")]
48     Read(base::Error),
49     /// Can't seek file.
50     #[error("An error occurred when seeking the FD: {0}.")]
51     Seeking(base::Error),
52     /// An error occurred when writing the FD.
53     #[error("An error occurred when writing the FD: {0}.")]
54     Write(base::Error),
55 }
56 pub type Result<T> = std::result::Result<T, Error>;
57 
58 impl From<Error> for io::Error {
from(e: Error) -> Self59     fn from(e: Error) -> Self {
60         use Error::*;
61         match e {
62             AddingWaker(e) => e.into(),
63             Executor(e) => e.into(),
64             Discard(e) => e.into(),
65             Fallocate(e) => e.into(),
66             Fdatasync(e) => e.into(),
67             Fsync(e) => e.into(),
68             Read(e) => e.into(),
69             Seeking(e) => e.into(),
70             Write(e) => e.into(),
71         }
72     }
73 }
74 
75 impl From<Error> for AsyncError {
from(e: Error) -> AsyncError76     fn from(e: Error) -> AsyncError {
77         AsyncError::SysVariants(e.into())
78     }
79 }
80 
81 /// Async wrapper for an IO source that uses the FD executor to drive async operations.
82 pub struct PollSource<F> {
83     registered_source: RegisteredSource<F>,
84 }
85 
86 impl<F: AsRawDescriptor> PollSource<F> {
87     /// Create a new `PollSource` from the given IO source.
new(f: F, ex: &Arc<RawExecutor<EpollReactor>>) -> Result<Self>88     pub fn new(f: F, ex: &Arc<RawExecutor<EpollReactor>>) -> Result<Self> {
89         RegisteredSource::new(ex, f)
90             .map({
91                 |f| PollSource {
92                     registered_source: f,
93                 }
94             })
95             .map_err(Error::Executor)
96     }
97 }
98 
99 impl<F: AsRawDescriptor> PollSource<F> {
100     /// Reads from the iosource at `file_offset` and fill the given `vec`.
read_to_vec( &self, file_offset: Option<u64>, mut vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>101     pub async fn read_to_vec(
102         &self,
103         file_offset: Option<u64>,
104         mut vec: Vec<u8>,
105     ) -> AsyncResult<(usize, Vec<u8>)> {
106         loop {
107             let res = if let Some(offset) = file_offset {
108                 // SAFETY:
109                 // Safe because this will only modify `vec` and we check the return value.
110                 unsafe {
111                     libc::pread64(
112                         self.registered_source.duped_fd.as_raw_fd(),
113                         vec.as_mut_ptr() as *mut libc::c_void,
114                         vec.len(),
115                         offset as libc::off64_t,
116                     )
117                 }
118             } else {
119                 // SAFETY:
120                 // Safe because this will only modify `vec` and we check the return value.
121                 unsafe {
122                     libc::read(
123                         self.registered_source.duped_fd.as_raw_fd(),
124                         vec.as_mut_ptr() as *mut libc::c_void,
125                         vec.len(),
126                     )
127                 }
128             };
129 
130             if res >= 0 {
131                 return Ok((res as usize, vec));
132             }
133 
134             match base::Error::last() {
135                 e if e.errno() == libc::EWOULDBLOCK => {
136                     let op = self
137                         .registered_source
138                         .wait_readable()
139                         .map_err(Error::AddingWaker)?;
140                     op.await.map_err(Error::Executor)?;
141                 }
142                 e => return Err(Error::Read(e).into()),
143             }
144         }
145     }
146 
147     /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>148     pub async fn read_to_mem(
149         &self,
150         file_offset: Option<u64>,
151         mem: Arc<dyn BackingMemory + Send + Sync>,
152         mem_offsets: impl IntoIterator<Item = MemRegion>,
153     ) -> AsyncResult<usize> {
154         let mut iovecs = mem_offsets
155             .into_iter()
156             .filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok())
157             .collect::<Vec<VolatileSlice>>();
158 
159         loop {
160             let res = if let Some(offset) = file_offset {
161                 // SAFETY:
162                 // Safe because we trust the kernel not to write path the length given and the
163                 // length is guaranteed to be valid from the pointer by
164                 // io_slice_mut.
165                 unsafe {
166                     libc::preadv64(
167                         self.registered_source.duped_fd.as_raw_fd(),
168                         iovecs.as_mut_ptr() as *mut _,
169                         iovecs.len() as i32,
170                         offset as libc::off64_t,
171                     )
172                 }
173             } else {
174                 // SAFETY:
175                 // Safe because we trust the kernel not to write path the length given and the
176                 // length is guaranteed to be valid from the pointer by
177                 // io_slice_mut.
178                 unsafe {
179                     libc::readv(
180                         self.registered_source.duped_fd.as_raw_fd(),
181                         iovecs.as_mut_ptr() as *mut _,
182                         iovecs.len() as i32,
183                     )
184                 }
185             };
186 
187             if res >= 0 {
188                 return Ok(res as usize);
189             }
190 
191             match base::Error::last() {
192                 e if e.errno() == libc::EWOULDBLOCK => {
193                     let op = self
194                         .registered_source
195                         .wait_readable()
196                         .map_err(Error::AddingWaker)?;
197                     op.await.map_err(Error::Executor)?;
198                 }
199                 e => return Err(Error::Read(e).into()),
200             }
201         }
202     }
203 
204     /// Wait for the FD of `self` to be readable.
wait_readable(&self) -> AsyncResult<()>205     pub async fn wait_readable(&self) -> AsyncResult<()> {
206         let op = self
207             .registered_source
208             .wait_readable()
209             .map_err(Error::AddingWaker)?;
210         op.await.map_err(Error::Executor)?;
211         Ok(())
212     }
213 
214     /// Writes from the given `vec` to the file starting at `file_offset`.
write_from_vec( &self, file_offset: Option<u64>, vec: Vec<u8>, ) -> AsyncResult<(usize, Vec<u8>)>215     pub async fn write_from_vec(
216         &self,
217         file_offset: Option<u64>,
218         vec: Vec<u8>,
219     ) -> AsyncResult<(usize, Vec<u8>)> {
220         loop {
221             let res = if let Some(offset) = file_offset {
222                 // SAFETY:
223                 // Safe because this will not modify any memory and we check the return value.
224                 unsafe {
225                     libc::pwrite64(
226                         self.registered_source.duped_fd.as_raw_fd(),
227                         vec.as_ptr() as *const libc::c_void,
228                         vec.len(),
229                         offset as libc::off64_t,
230                     )
231                 }
232             } else {
233                 // SAFETY:
234                 // Safe because this will not modify any memory and we check the return value.
235                 unsafe {
236                     libc::write(
237                         self.registered_source.duped_fd.as_raw_fd(),
238                         vec.as_ptr() as *const libc::c_void,
239                         vec.len(),
240                     )
241                 }
242             };
243 
244             if res >= 0 {
245                 return Ok((res as usize, vec));
246             }
247 
248             match base::Error::last() {
249                 e if e.errno() == libc::EWOULDBLOCK => {
250                     let op = self
251                         .registered_source
252                         .wait_writable()
253                         .map_err(Error::AddingWaker)?;
254                     op.await.map_err(Error::Executor)?;
255                 }
256                 e => return Err(Error::Write(e).into()),
257             }
258         }
259     }
260 
261     /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, mem_offsets: impl IntoIterator<Item = MemRegion>, ) -> AsyncResult<usize>262     pub async fn write_from_mem(
263         &self,
264         file_offset: Option<u64>,
265         mem: Arc<dyn BackingMemory + Send + Sync>,
266         mem_offsets: impl IntoIterator<Item = MemRegion>,
267     ) -> AsyncResult<usize> {
268         let iovecs = mem_offsets
269             .into_iter()
270             .map(|mem_range| mem.get_volatile_slice(mem_range))
271             .filter_map(|r| r.ok())
272             .collect::<Vec<VolatileSlice>>();
273 
274         loop {
275             let res = if let Some(offset) = file_offset {
276                 // SAFETY:
277                 // Safe because we trust the kernel not to write path the length given and the
278                 // length is guaranteed to be valid from the pointer by
279                 // io_slice_mut.
280                 unsafe {
281                     libc::pwritev64(
282                         self.registered_source.duped_fd.as_raw_fd(),
283                         iovecs.as_ptr() as *mut _,
284                         iovecs.len() as i32,
285                         offset as libc::off64_t,
286                     )
287                 }
288             } else {
289                 // SAFETY:
290                 // Safe because we trust the kernel not to write path the length given and the
291                 // length is guaranteed to be valid from the pointer by
292                 // io_slice_mut.
293                 unsafe {
294                     libc::writev(
295                         self.registered_source.duped_fd.as_raw_fd(),
296                         iovecs.as_ptr() as *mut _,
297                         iovecs.len() as i32,
298                     )
299                 }
300             };
301 
302             if res >= 0 {
303                 return Ok(res as usize);
304             }
305 
306             match base::Error::last() {
307                 e if e.errno() == libc::EWOULDBLOCK => {
308                     let op = self
309                         .registered_source
310                         .wait_writable()
311                         .map_err(Error::AddingWaker)?;
312                     op.await.map_err(Error::Executor)?;
313                 }
314                 e => return Err(Error::Write(e).into()),
315             }
316         }
317     }
318 
319     /// # Safety
320     ///
321     /// Sync all completed write operations to the backing storage.
fsync(&self) -> AsyncResult<()>322     pub async fn fsync(&self) -> AsyncResult<()> {
323         // SAFETY: the duped_fd is valid and return value is checked.
324         let ret = unsafe { libc::fsync(self.registered_source.duped_fd.as_raw_fd()) };
325         if ret == 0 {
326             Ok(())
327         } else {
328             Err(Error::Fsync(base::Error::last()).into())
329         }
330     }
331 
332     /// punch_hole
punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()>333     pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
334         Ok(fallocate(
335             &self.registered_source.duped_fd,
336             FallocateMode::PunchHole,
337             file_offset,
338             len,
339         )
340         .map_err(Error::Fallocate)?)
341     }
342 
343     /// write_zeroes_at
write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()>344     pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
345         Ok(fallocate(
346             &self.registered_source.duped_fd,
347             FallocateMode::ZeroRange,
348             file_offset,
349             len,
350         )
351         .map_err(Error::Fallocate)?)
352     }
353 
354     /// Sync all data of completed write operations to the backing storage, avoiding updating extra
355     /// metadata.
fdatasync(&self) -> AsyncResult<()>356     pub async fn fdatasync(&self) -> AsyncResult<()> {
357         // SAFETY: the duped_fd is valid and return value is checked.
358         let ret = unsafe { libc::fdatasync(self.registered_source.duped_fd.as_raw_fd()) };
359         if ret == 0 {
360             Ok(())
361         } else {
362             Err(Error::Fdatasync(base::Error::last()).into())
363         }
364     }
365 
366     /// Yields the underlying IO source.
into_source(self) -> F367     pub fn into_source(self) -> F {
368         self.registered_source.source
369     }
370 
371     /// Provides a mutable ref to the underlying IO source.
as_source_mut(&mut self) -> &mut F372     pub fn as_source_mut(&mut self) -> &mut F {
373         &mut self.registered_source.source
374     }
375 
376     /// Provides a ref to the underlying IO source.
as_source(&self) -> &F377     pub fn as_source(&self) -> &F {
378         &self.registered_source.source
379     }
380 }
381 
382 // NOTE: Prefer adding tests to io_source.rs if not backend specific.
383 #[cfg(test)]
384 mod tests {
385     use std::fs::File;
386 
387     use super::*;
388     use crate::ExecutorTrait;
389 
390     #[test]
memory_leak()391     fn memory_leak() {
392         // This test needs to run under ASAN to detect memory leaks.
393 
394         async fn owns_poll_source(source: PollSource<File>) {
395             let _ = source.wait_readable().await;
396         }
397 
398         let (rx, _tx) = base::pipe().unwrap();
399         let ex = RawExecutor::<EpollReactor>::new().unwrap();
400         let source = PollSource::new(rx, &ex).unwrap();
401         ex.spawn_local(owns_poll_source(source)).detach();
402 
403         // Drop `ex` without running. This would cause a memory leak if PollSource owned a strong
404         // reference to the executor because it owns a reference to the future that owns PollSource
405         // (via its Runnable). The strong reference prevents the drop impl from running, which would
406         // otherwise poll the future and have it return with an error.
407     }
408 }
409