1 use std::{
2     convert::TryInto,
3     fmt,
4     hash::Hasher,
5     io::{self, BufRead, ErrorKind},
6     mem::size_of,
7 };
8 use twox_hash::XxHash32;
9 
10 use super::header::{
11     BlockInfo, BlockMode, FrameInfo, LZ4F_LEGACY_MAGIC_NUMBER, MAGIC_NUMBER_SIZE,
12     MAX_FRAME_INFO_SIZE, MIN_FRAME_INFO_SIZE,
13 };
14 use super::Error;
15 use crate::{
16     block::WINDOW_SIZE,
17     sink::{vec_sink_for_decompression, SliceSink},
18 };
19 
20 /// A reader for decompressing the LZ4 frame format
21 ///
22 /// This Decoder wraps any other reader that implements `io::Read`.
23 /// Bytes read will be decompressed according to the [LZ4 frame format](
24 /// https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md).
25 ///
26 /// # Example 1
27 /// Deserializing json values out of a compressed file.
28 ///
29 /// ```no_run
30 /// let compressed_input = std::fs::File::open("datafile").unwrap();
31 /// let mut decompressed_input = lz4_flex::frame::FrameDecoder::new(compressed_input);
32 /// let json: serde_json::Value = serde_json::from_reader(decompressed_input).unwrap();
33 /// ```
34 ///
35 /// # Example
36 /// Deserializing multiple json values out of a compressed file
37 ///
38 /// ```no_run
39 /// let compressed_input = std::fs::File::open("datafile").unwrap();
40 /// let mut decompressed_input = lz4_flex::frame::FrameDecoder::new(compressed_input);
41 /// loop {
42 ///     match serde_json::from_reader::<_, serde_json::Value>(&mut decompressed_input) {
43 ///         Ok(json) => { println!("json {:?}", json); }
44 ///         Err(e) if e.is_eof() => break,
45 ///         Err(e) => panic!("{}", e),
46 ///     }
47 /// }
48 /// ```
49 pub struct FrameDecoder<R: io::Read> {
50     /// The underlying reader.
51     r: R,
52     /// The FrameInfo of the frame currently being decoded.
53     /// It starts as `None` and is filled with the FrameInfo is read from the input.
54     /// It's reset to `None` once the frame EndMarker is read from the input.
55     current_frame_info: Option<FrameInfo>,
56     /// Xxhash32 used when content checksum is enabled.
57     content_hasher: XxHash32,
58     /// Total length of decompressed output for the current frame.
59     content_len: u64,
60     /// The compressed bytes buffer, taken from the underlying reader.
61     src: Vec<u8>,
62     /// The decompressed bytes buffer. Bytes are decompressed from src to dst
63     /// before being passed back to the caller.
64     dst: Vec<u8>,
65     /// Index into dst and length: starting point of bytes previously output
66     /// that are still part of the decompressor window.
67     ext_dict_offset: usize,
68     ext_dict_len: usize,
69     /// Index into dst: starting point of bytes not yet read by caller.
70     dst_start: usize,
71     /// Index into dst: ending point of bytes not yet read by caller.
72     dst_end: usize,
73 }
74 
75 impl<R: io::Read> FrameDecoder<R> {
76     /// Creates a new Decoder for the specified reader.
new(rdr: R) -> FrameDecoder<R>77     pub fn new(rdr: R) -> FrameDecoder<R> {
78         FrameDecoder {
79             r: rdr,
80             src: Default::default(),
81             dst: Default::default(),
82             ext_dict_offset: 0,
83             ext_dict_len: 0,
84             dst_start: 0,
85             dst_end: 0,
86             current_frame_info: None,
87             content_hasher: XxHash32::with_seed(0),
88             content_len: 0,
89         }
90     }
91 
92     /// Gets a reference to the underlying reader in this decoder.
get_ref(&self) -> &R93     pub fn get_ref(&self) -> &R {
94         &self.r
95     }
96 
97     /// Gets a mutable reference to the underlying reader in this decoder.
98     ///
99     /// Note that mutation of the stream may result in surprising results if
100     /// this decoder is continued to be used.
get_mut(&mut self) -> &mut R101     pub fn get_mut(&mut self) -> &mut R {
102         &mut self.r
103     }
104 
105     /// Consumes the FrameDecoder and returns the underlying reader.
into_inner(self) -> R106     pub fn into_inner(self) -> R {
107         self.r
108     }
109 
read_frame_info(&mut self) -> Result<usize, io::Error>110     fn read_frame_info(&mut self) -> Result<usize, io::Error> {
111         let mut buffer = [0u8; MAX_FRAME_INFO_SIZE];
112 
113         match self.r.read(&mut buffer[..MAGIC_NUMBER_SIZE])? {
114             0 => return Ok(0),
115             MAGIC_NUMBER_SIZE => (),
116             read => self.r.read_exact(&mut buffer[read..MAGIC_NUMBER_SIZE])?,
117         }
118 
119         if u32::from_le_bytes(buffer[0..MAGIC_NUMBER_SIZE].try_into().unwrap())
120             != LZ4F_LEGACY_MAGIC_NUMBER
121         {
122             match self
123                 .r
124                 .read(&mut buffer[MAGIC_NUMBER_SIZE..MIN_FRAME_INFO_SIZE])?
125             {
126                 0 => return Ok(0),
127                 MIN_FRAME_INFO_SIZE => (),
128                 read => self
129                     .r
130                     .read_exact(&mut buffer[MAGIC_NUMBER_SIZE + read..MIN_FRAME_INFO_SIZE])?,
131             }
132         }
133         let required = FrameInfo::read_size(&buffer[..MIN_FRAME_INFO_SIZE])?;
134         if required != MIN_FRAME_INFO_SIZE && required != MAGIC_NUMBER_SIZE {
135             self.r
136                 .read_exact(&mut buffer[MIN_FRAME_INFO_SIZE..required])?;
137         }
138 
139         let frame_info = FrameInfo::read(&buffer[..required])?;
140         if frame_info.dict_id.is_some() {
141             // Unsupported right now so it must be None
142             return Err(Error::DictionaryNotSupported.into());
143         }
144 
145         let max_block_size = frame_info.block_size.get_size();
146         let dst_size = if frame_info.block_mode == BlockMode::Linked {
147             // In linked mode we consume the output (bumping dst_start) but leave the
148             // beginning of dst to be used as a prefix in subsequent blocks.
149             // That is at least until we have at least `max_block_size + WINDOW_SIZE`
150             // bytes in dst, then we setup an ext_dict with the last WINDOW_SIZE bytes
151             // and the output goes to the beginning of dst again.
152             // Since we always want to be able to write a full block (up to max_block_size)
153             // we need a buffer with at least `max_block_size * 2 + WINDOW_SIZE` bytes.
154             max_block_size * 2 + WINDOW_SIZE
155         } else {
156             max_block_size
157         };
158         self.src.clear();
159         self.dst.clear();
160         self.src.reserve_exact(max_block_size);
161         self.dst.reserve_exact(dst_size);
162         self.current_frame_info = Some(frame_info);
163         self.content_hasher = XxHash32::with_seed(0);
164         self.content_len = 0;
165         self.ext_dict_len = 0;
166         self.dst_start = 0;
167         self.dst_end = 0;
168         Ok(required)
169     }
170 
171     #[inline]
read_checksum(r: &mut R) -> Result<u32, io::Error>172     fn read_checksum(r: &mut R) -> Result<u32, io::Error> {
173         let mut checksum_buffer = [0u8; size_of::<u32>()];
174         r.read_exact(&mut checksum_buffer[..])?;
175         let checksum = u32::from_le_bytes(checksum_buffer);
176         Ok(checksum)
177     }
178 
179     #[inline]
check_block_checksum(data: &[u8], expected_checksum: u32) -> Result<(), io::Error>180     fn check_block_checksum(data: &[u8], expected_checksum: u32) -> Result<(), io::Error> {
181         let mut block_hasher = XxHash32::with_seed(0);
182         block_hasher.write(data);
183         let calc_checksum = block_hasher.finish() as u32;
184         if calc_checksum != expected_checksum {
185             return Err(Error::BlockChecksumError.into());
186         }
187         Ok(())
188     }
189 
read_block(&mut self) -> io::Result<usize>190     fn read_block(&mut self) -> io::Result<usize> {
191         debug_assert_eq!(self.dst_start, self.dst_end);
192         let frame_info = self.current_frame_info.as_ref().unwrap();
193 
194         // Adjust dst buffer offsets to decompress the next block
195         let max_block_size = frame_info.block_size.get_size();
196         if frame_info.block_mode == BlockMode::Linked {
197             // In linked mode we consume the output (bumping dst_start) but leave the
198             // beginning of dst to be used as a prefix in subsequent blocks.
199             // That is at least until we have at least `max_block_size + WINDOW_SIZE`
200             // bytes in dst, then we setup an ext_dict with the last WINDOW_SIZE bytes
201             // and the output goes to the beginning of dst again.
202             debug_assert_eq!(self.dst.capacity(), max_block_size * 2 + WINDOW_SIZE);
203             if self.dst_start + max_block_size > self.dst.capacity() {
204                 // Output might not fit in the buffer.
205                 // The ext_dict will become the last WINDOW_SIZE bytes
206                 debug_assert!(self.dst_start >= max_block_size + WINDOW_SIZE);
207                 self.ext_dict_offset = self.dst_start - WINDOW_SIZE;
208                 self.ext_dict_len = WINDOW_SIZE;
209                 // Output goes in the beginning of the buffer again.
210                 self.dst_start = 0;
211                 self.dst_end = 0;
212             } else if self.dst_start + self.ext_dict_len > WINDOW_SIZE {
213                 // There's more than WINDOW_SIZE bytes of lookback adding the prefix and ext_dict.
214                 // Since we have a limited buffer we must shrink ext_dict in favor of the prefix,
215                 // so that we can fit up to max_block_size bytes between dst_start and ext_dict
216                 // start.
217                 let delta = self
218                     .ext_dict_len
219                     .min(self.dst_start + self.ext_dict_len - WINDOW_SIZE);
220                 self.ext_dict_offset += delta;
221                 self.ext_dict_len -= delta;
222                 debug_assert!(self.dst_start + self.ext_dict_len >= WINDOW_SIZE)
223             }
224         } else {
225             debug_assert_eq!(self.ext_dict_len, 0);
226             debug_assert_eq!(self.dst.capacity(), max_block_size);
227             self.dst_start = 0;
228             self.dst_end = 0;
229         }
230 
231         // Read and decompress block
232         let block_info = {
233             let mut buffer = [0u8; 4];
234             if let Err(err) = self.r.read_exact(&mut buffer) {
235                 if err.kind() == ErrorKind::UnexpectedEof {
236                     return Ok(0);
237                 } else {
238                     return Err(err);
239                 }
240             }
241             BlockInfo::read(&buffer)?
242         };
243         match block_info {
244             BlockInfo::Uncompressed(len) => {
245                 let len = len as usize;
246                 if len > max_block_size {
247                     return Err(Error::BlockTooBig.into());
248                 }
249                 // TODO: Attempt to avoid initialization of read buffer when
250                 // https://github.com/rust-lang/rust/issues/42788 stabilizes
251                 self.r.read_exact(vec_resize_and_get_mut(
252                     &mut self.dst,
253                     self.dst_start,
254                     self.dst_start + len,
255                 ))?;
256                 if frame_info.block_checksums {
257                     let expected_checksum = Self::read_checksum(&mut self.r)?;
258                     Self::check_block_checksum(
259                         &self.dst[self.dst_start..self.dst_start + len],
260                         expected_checksum,
261                     )?;
262                 }
263 
264                 self.dst_end += len;
265                 self.content_len += len as u64;
266             }
267             BlockInfo::Compressed(len) => {
268                 let len = len as usize;
269                 if len > max_block_size {
270                     return Err(Error::BlockTooBig.into());
271                 }
272                 // TODO: Attempt to avoid initialization of read buffer when
273                 // https://github.com/rust-lang/rust/issues/42788 stabilizes
274                 self.r
275                     .read_exact(vec_resize_and_get_mut(&mut self.src, 0, len))?;
276                 if frame_info.block_checksums {
277                     let expected_checksum = Self::read_checksum(&mut self.r)?;
278                     Self::check_block_checksum(&self.src[..len], expected_checksum)?;
279                 }
280 
281                 let with_dict_mode =
282                     frame_info.block_mode == BlockMode::Linked && self.ext_dict_len != 0;
283                 let decomp_size = if with_dict_mode {
284                     debug_assert!(self.dst_start + max_block_size <= self.ext_dict_offset);
285                     let (head, tail) = self.dst.split_at_mut(self.ext_dict_offset);
286                     let ext_dict = &tail[..self.ext_dict_len];
287 
288                     debug_assert!(head.len() - self.dst_start >= max_block_size);
289                     crate::block::decompress::decompress_internal::<true, _>(
290                         &self.src[..len],
291                         &mut SliceSink::new(head, self.dst_start),
292                         ext_dict,
293                     )
294                 } else {
295                     // Independent blocks OR linked blocks with only prefix data
296                     debug_assert!(self.dst.capacity() - self.dst_start >= max_block_size);
297                     crate::block::decompress::decompress_internal::<false, _>(
298                         &self.src[..len],
299                         &mut vec_sink_for_decompression(
300                             &mut self.dst,
301                             0,
302                             self.dst_start,
303                             self.dst_start + max_block_size,
304                         ),
305                         b"",
306                     )
307                 }
308                 .map_err(Error::DecompressionError)?;
309 
310                 self.dst_end += decomp_size;
311                 self.content_len += decomp_size as u64;
312             }
313 
314             BlockInfo::EndMark => {
315                 if let Some(expected) = frame_info.content_size {
316                     if self.content_len != expected {
317                         return Err(Error::ContentLengthError {
318                             expected,
319                             actual: self.content_len,
320                         }
321                         .into());
322                     }
323                 }
324                 if frame_info.content_checksum {
325                     let expected_checksum = Self::read_checksum(&mut self.r)?;
326                     let calc_checksum = self.content_hasher.finish() as u32;
327                     if calc_checksum != expected_checksum {
328                         return Err(Error::ContentChecksumError.into());
329                     }
330                 }
331                 self.current_frame_info = None;
332                 return Ok(0);
333             }
334         }
335 
336         // Content checksum, if applicable
337         if frame_info.content_checksum {
338             self.content_hasher
339                 .write(&self.dst[self.dst_start..self.dst_end]);
340         }
341 
342         Ok(self.dst_end - self.dst_start)
343     }
344 
read_more(&mut self) -> io::Result<usize>345     fn read_more(&mut self) -> io::Result<usize> {
346         if self.current_frame_info.is_none() && self.read_frame_info()? == 0 {
347             return Ok(0);
348         }
349         self.read_block()
350     }
351 }
352 
353 impl<R: io::Read> io::Read for FrameDecoder<R> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>354     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
355         loop {
356             // Fill read buffer if there's uncompressed data left
357             if self.dst_start < self.dst_end {
358                 let read_len = std::cmp::min(self.dst_end - self.dst_start, buf.len());
359                 let dst_read_end = self.dst_start + read_len;
360                 buf[..read_len].copy_from_slice(&self.dst[self.dst_start..dst_read_end]);
361                 self.dst_start = dst_read_end;
362                 return Ok(read_len);
363             }
364             if self.read_more()? == 0 {
365                 return Ok(0);
366             }
367         }
368     }
369 
read_to_string(&mut self, buf: &mut String) -> io::Result<usize>370     fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
371         let mut written = 0;
372         loop {
373             match self.fill_buf() {
374                 Ok(b) if b.is_empty() => return Ok(written),
375                 Ok(b) => {
376                     let s = std::str::from_utf8(b).map_err(|_| {
377                         io::Error::new(
378                             io::ErrorKind::InvalidData,
379                             "stream did not contain valid UTF-8",
380                         )
381                     })?;
382                     buf.push_str(s);
383                     let len = s.len();
384                     self.consume(len);
385                     written += len;
386                 }
387                 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
388                 Err(e) => return Err(e),
389             }
390         }
391     }
392 
read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize>393     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
394         let mut written = 0;
395         loop {
396             match self.fill_buf() {
397                 Ok(b) if b.is_empty() => return Ok(written),
398                 Ok(b) => {
399                     buf.extend_from_slice(b);
400                     let len = b.len();
401                     self.consume(len);
402                     written += len;
403                 }
404                 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
405                 Err(e) => return Err(e),
406             }
407         }
408     }
409 }
410 
411 impl<R: io::Read> io::BufRead for FrameDecoder<R> {
fill_buf(&mut self) -> io::Result<&[u8]>412     fn fill_buf(&mut self) -> io::Result<&[u8]> {
413         if self.dst_start == self.dst_end {
414             self.read_more()?;
415         }
416         Ok(&self.dst[self.dst_start..self.dst_end])
417     }
418 
consume(&mut self, amt: usize)419     fn consume(&mut self, amt: usize) {
420         assert!(amt <= self.dst_end - self.dst_start);
421         self.dst_start += amt;
422     }
423 }
424 
425 impl<R: fmt::Debug + io::Read> fmt::Debug for FrameDecoder<R> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result426     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
427         f.debug_struct("FrameDecoder")
428             .field("r", &self.r)
429             .field("content_hasher", &self.content_hasher)
430             .field("content_len", &self.content_len)
431             .field("src", &"[...]")
432             .field("dst", &"[...]")
433             .field("dst_start", &self.dst_start)
434             .field("dst_end", &self.dst_end)
435             .field("ext_dict_offset", &self.ext_dict_offset)
436             .field("ext_dict_len", &self.ext_dict_len)
437             .field("current_frame_info", &self.current_frame_info)
438             .finish()
439     }
440 }
441 
442 /// Similar to `v.get_mut(start..end) but will adjust the len if needed.
443 #[inline]
vec_resize_and_get_mut(v: &mut Vec<u8>, start: usize, end: usize) -> &mut [u8]444 fn vec_resize_and_get_mut(v: &mut Vec<u8>, start: usize, end: usize) -> &mut [u8] {
445     if end > v.len() {
446         v.resize(end, 0)
447     }
448     &mut v[start..end]
449 }
450