xref: /aosp_15_r20/external/pigweed/pw_stream/rust/pw_stream/lib.rs (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 //! `pw_stream` provides `no_std` versions of Rust's [`std::io::Read`],
16 //! [`std::io::Write`], and [`std::io::Seek`] traits as well as a simplified
17 //! version of [`std::io::Cursor`].  One notable difference is that
18 //! [`pw_status::Error`] is used to avoid needing to do error conversion or
19 //! encapsulation.
20 #![deny(missing_docs)]
21 // Allows docs to reference `std`
22 #![cfg_attr(feature = "no_std", no_std)]
23 
24 use pw_status::{Error, Result};
25 
26 #[doc(hidden)]
27 mod cursor;
28 mod integer;
29 
30 pub use cursor::Cursor;
31 pub use integer::{ReadInteger, ReadVarint, WriteInteger, WriteVarint};
32 
33 /// A trait for objects that provide streaming read capability.
34 pub trait Read {
35     /// Read from a stream into a buffer.
36     ///
37     /// Semantics match [`std::io::Read::read()`].
read(&mut self, buf: &mut [u8]) -> Result<usize>38     fn read(&mut self, buf: &mut [u8]) -> Result<usize>;
39 
40     /// Read exactly enough bytes to fill the buffer.
41     ///
42     /// Semantics match [`std::io::Read::read_exact()`].
read_exact(&mut self, mut buf: &mut [u8]) -> Result<()>43     fn read_exact(&mut self, mut buf: &mut [u8]) -> Result<()> {
44         while !buf.is_empty() {
45             let len = self.read(buf)?;
46 
47             // End of stream
48             if len == 0 {
49                 break;
50             }
51 
52             buf = &mut buf[len..];
53         }
54 
55         if !buf.is_empty() {
56             Err(Error::OutOfRange)
57         } else {
58             Ok(())
59         }
60     }
61 }
62 
63 /// A trait for objects that provide streaming write capability.
64 pub trait Write {
65     /// Write a buffer to a stream.
66     ///
67     /// Semantics match [`std::io::Write::write()`].
write(&mut self, buf: &[u8]) -> Result<usize>68     fn write(&mut self, buf: &[u8]) -> Result<usize>;
69 
70     /// Commit any outstanding buffered writes to underlying storage.
71     ///
72     /// Semantics match [`std::io::Write::flush()`].
flush(&mut self) -> Result<()>73     fn flush(&mut self) -> Result<()>;
74 
75     /// Writes entire buffer to stream.
76     ///
77     /// Semantics match [`std::io::Write::write_all()`].
write_all(&mut self, mut buf: &[u8]) -> Result<()>78     fn write_all(&mut self, mut buf: &[u8]) -> Result<()> {
79         while !buf.is_empty() {
80             let len = self.write(buf)?;
81 
82             // End of stream
83             if len == 0 {
84                 break;
85             }
86 
87             buf = &buf[len..];
88         }
89 
90         if !buf.is_empty() {
91             Err(Error::OutOfRange)
92         } else {
93             Ok(())
94         }
95     }
96 }
97 
98 /// A description of a seek operation in a stream.
99 ///
100 /// While `pw_stream` targets embedded platforms which are often natively
101 /// 32 bit, we believe that seek operation are relatively rare and the added
102 /// overhead of using 64 bit values for seeks is balanced by the ability
103 /// to support objects and operations over 4 GiB.
104 pub enum SeekFrom {
105     /// Seek from the start of the stream.
106     Start(u64),
107 
108     /// Seek from the end of the stream.
109     End(i64),
110 
111     /// Seek from the current position of the stream.
112     Current(i64),
113 }
114 
115 /// A trait for objects that provide the ability to seek withing a stream.
116 pub trait Seek {
117     /// Adjust the current position of the stream.
118     ///
119     /// Semantics match [`std::io::Seek::seek()`].
seek(&mut self, pos: SeekFrom) -> Result<u64>120     fn seek(&mut self, pos: SeekFrom) -> Result<u64>;
121 
122     /// Set the current position of the stream to its beginning.
123     ///
124     /// Semantics match [`std::io::Seek::rewind()`].
rewind(&mut self) -> Result<()>125     fn rewind(&mut self) -> Result<()> {
126         self.seek(SeekFrom::Start(0)).map(|_| ())
127     }
128 
129     /// Returns the length of the stream.
130     ///
131     /// Semantics match [`std::io::Seek::stream_len()`].
stream_len(&mut self) -> Result<u64>132     fn stream_len(&mut self) -> Result<u64> {
133         // Save original position.
134         let orig_pos = self.seek(SeekFrom::Current(0))?;
135 
136         // Seed to the end to discover stream length.
137         let end_pos = self.seek(SeekFrom::End(0))?;
138 
139         // Go back to original position.
140         self.seek(SeekFrom::Start(orig_pos))?;
141 
142         Ok(end_pos)
143     }
144 
145     /// Returns the current position of the stream.
146     ///
147     /// Semantics match [`std::io::Seek::stream_position()`].
stream_position(&mut self) -> Result<u64>148     fn stream_position(&mut self) -> Result<u64> {
149         self.seek(SeekFrom::Current(0))
150     }
151 }
152 
153 #[cfg(test)]
154 pub(crate) mod test_utils {
155     use super::{Seek, SeekFrom};
156 
test_rewind_resets_position_to_zero<const LEN: u64, T: Seek>(mut seeker: T)157     pub(crate) fn test_rewind_resets_position_to_zero<const LEN: u64, T: Seek>(mut seeker: T) {
158         seeker.seek(SeekFrom::Current(LEN as i64 / 2)).unwrap();
159         assert_eq!(seeker.stream_position().unwrap(), LEN / 2);
160         seeker.rewind().unwrap();
161         assert_eq!(seeker.stream_position().unwrap(), 0);
162     }
163 
test_stream_pos_reports_correct_position<const LEN: u64, T: Seek>(mut seeker: T)164     pub(crate) fn test_stream_pos_reports_correct_position<const LEN: u64, T: Seek>(mut seeker: T) {
165         assert_eq!(seeker.stream_position().unwrap(), 0);
166         seeker.seek(SeekFrom::Current(1)).unwrap();
167         assert_eq!(seeker.stream_position().unwrap(), 1);
168         seeker.seek(SeekFrom::Current(LEN as i64 / 2 - 1)).unwrap();
169         assert_eq!(seeker.stream_position().unwrap(), LEN / 2);
170         seeker.seek(SeekFrom::Current(0)).unwrap();
171         assert_eq!(seeker.stream_position().unwrap(), LEN / 2);
172         seeker.seek(SeekFrom::End(0)).unwrap();
173         assert_eq!(seeker.stream_position().unwrap(), LEN);
174     }
175 
test_stream_len_reports_correct_length<const LEN: u64, T: Seek>(mut seeker: T)176     pub(crate) fn test_stream_len_reports_correct_length<const LEN: u64, T: Seek>(mut seeker: T) {
177         assert_eq!(seeker.stream_len().unwrap(), LEN);
178     }
179 }
180 
181 #[cfg(test)]
182 mod tests {
183     use core::cmp::min;
184 
185     use super::test_utils::*;
186     use super::*;
187 
188     struct TestSeeker {
189         len: u64,
190         pos: u64,
191     }
192 
193     impl Seek for TestSeeker {
seek(&mut self, pos: SeekFrom) -> Result<u64>194         fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
195             let new_pos = match pos {
196                 SeekFrom::Start(pos) => pos,
197                 SeekFrom::Current(pos) => {
198                     self.pos.checked_add_signed(pos).ok_or(Error::OutOfRange)?
199                 }
200                 SeekFrom::End(pos) => self.len.checked_add_signed(-pos).ok_or(Error::OutOfRange)?,
201             };
202 
203             if new_pos > self.len {
204                 Err(Error::OutOfRange)
205             } else {
206                 self.pos = new_pos;
207                 Ok(new_pos)
208             }
209         }
210     }
211 
212     // A stream wrapper that limits reads and writes to a maximum chunk size.
213     struct ChunkedStreamAdapter<S: Read + Write + Seek> {
214         inner: S,
215         chunk_size: usize,
216         num_reads: u32,
217         num_writes: u32,
218     }
219 
220     impl<S: Read + Write + Seek> ChunkedStreamAdapter<S> {
new(inner: S, chunk_size: usize) -> Self221         fn new(inner: S, chunk_size: usize) -> Self {
222             Self {
223                 inner,
224                 chunk_size,
225                 num_reads: 0,
226                 num_writes: 0,
227             }
228         }
229     }
230 
231     impl<S: Read + Write + Seek> Read for ChunkedStreamAdapter<S> {
read(&mut self, buf: &mut [u8]) -> Result<usize>232         fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
233             let read_len = min(self.chunk_size, buf.len());
234             self.num_reads += 1;
235             self.inner.read(&mut buf[..read_len])
236         }
237     }
238 
239     impl<S: Read + Write + Seek> Write for ChunkedStreamAdapter<S> {
write(&mut self, buf: &[u8]) -> Result<usize>240         fn write(&mut self, buf: &[u8]) -> Result<usize> {
241             let write_len = min(self.chunk_size, buf.len());
242             self.num_writes += 1;
243             self.inner.write(&buf[..write_len])
244         }
245 
flush(&mut self) -> Result<()>246         fn flush(&mut self) -> Result<()> {
247             self.inner.flush()
248         }
249     }
250 
251     struct ErrorStream {
252         error: Error,
253     }
254 
255     impl Read for ErrorStream {
read(&mut self, _buf: &mut [u8]) -> Result<usize>256         fn read(&mut self, _buf: &mut [u8]) -> Result<usize> {
257             Err(self.error)
258         }
259     }
260 
261     impl Write for ErrorStream {
write(&mut self, _buf: &[u8]) -> Result<usize>262         fn write(&mut self, _buf: &[u8]) -> Result<usize> {
263             Err(self.error)
264         }
265 
flush(&mut self) -> Result<()>266         fn flush(&mut self) -> Result<()> {
267             Err(self.error)
268         }
269     }
270 
271     #[test]
default_rewind_impl_resets_position_to_zero()272     fn default_rewind_impl_resets_position_to_zero() {
273         test_rewind_resets_position_to_zero::<64, _>(TestSeeker { len: 64, pos: 0 });
274     }
275 
276     #[test]
default_stream_pos_impl_reports_correct_position()277     fn default_stream_pos_impl_reports_correct_position() {
278         test_stream_pos_reports_correct_position::<64, _>(TestSeeker { len: 64, pos: 0 });
279     }
280 
281     #[test]
default_stream_len_impl_reports_correct_length()282     fn default_stream_len_impl_reports_correct_length() {
283         test_stream_len_reports_correct_length::<64, _>(TestSeeker { len: 64, pos: 32 });
284     }
285 
286     #[test]
read_exact_reads_full_buffer_on_short_reads()287     fn read_exact_reads_full_buffer_on_short_reads() {
288         let cursor = Cursor::new((0x0..=0xff).collect::<Vec<u8>>());
289         // Limit reads to 10 bytes per read.
290         let mut wrapper = ChunkedStreamAdapter::new(cursor, 10);
291         let mut read_buffer = vec![0u8; 256];
292 
293         wrapper.read_exact(&mut read_buffer).unwrap();
294 
295         // Ensure that the correct bytes were read.
296         assert_eq!(wrapper.inner.into_inner(), read_buffer);
297 
298         // Verify that the read was broken up into the correct number of reads.
299         assert_eq!(wrapper.num_reads, 26);
300     }
301 
302     #[test]
read_exact_returns_error_on_too_little_data()303     fn read_exact_returns_error_on_too_little_data() {
304         let cursor = Cursor::new((0x0..=0x7f).collect::<Vec<u8>>());
305         // Limit reads to 10 bytes per read.
306         let mut wrapper = ChunkedStreamAdapter::new(cursor, 10);
307         let mut read_buffer = vec![0u8; 256];
308 
309         assert_eq!(wrapper.read_exact(&mut read_buffer), Err(Error::OutOfRange));
310     }
311 
312     #[test]
read_exact_propagates_read_errors()313     fn read_exact_propagates_read_errors() {
314         let mut error_stream = ErrorStream {
315             error: Error::Internal,
316         };
317         let mut read_buffer = vec![0u8; 256];
318         assert_eq!(
319             error_stream.read_exact(&mut read_buffer),
320             Err(Error::Internal)
321         );
322     }
323 
324     #[test]
write_all_writes_full_buffer_on_short_writes()325     fn write_all_writes_full_buffer_on_short_writes() {
326         let cursor = Cursor::new(vec![0u8; 256]);
327         // Limit writes to 10 bytes per write.
328         let mut wrapper = ChunkedStreamAdapter::new(cursor, 10);
329         let write_buffer = (0x0..=0xff).collect::<Vec<u8>>();
330 
331         wrapper.write_all(&write_buffer).unwrap();
332 
333         // Ensure that the correct bytes were written.
334         assert_eq!(wrapper.inner.into_inner(), write_buffer);
335 
336         // Verify that the write was broken up into the correct number of writes.
337         assert_eq!(wrapper.num_writes, 26);
338     }
339 
340     #[test]
write_all_returns_error_on_too_little_data()341     fn write_all_returns_error_on_too_little_data() {
342         let cursor = Cursor::new(vec![0u8; 128]);
343         // Limit writes to 10 bytes per write.
344         let mut wrapper = ChunkedStreamAdapter::new(cursor, 10);
345         let write_buffer = (0x0..=0xff).collect::<Vec<u8>>();
346 
347         assert_eq!(wrapper.write_all(&write_buffer), Err(Error::OutOfRange));
348     }
349 
350     #[test]
write_all_propagates_write_errors()351     fn write_all_propagates_write_errors() {
352         let mut error_stream = ErrorStream {
353             error: Error::Internal,
354         };
355         let write_buffer = (0x0..=0xff).collect::<Vec<u8>>();
356         assert_eq!(error_stream.write_all(&write_buffer), Err(Error::Internal));
357     }
358 }
359