1 use std::{
2 convert::TryInto,
3 fmt,
4 hash::Hasher,
5 io::{self, BufRead, ErrorKind},
6 mem::size_of,
7 };
8 use twox_hash::XxHash32;
9
10 use super::header::{
11 BlockInfo, BlockMode, FrameInfo, LZ4F_LEGACY_MAGIC_NUMBER, MAGIC_NUMBER_SIZE,
12 MAX_FRAME_INFO_SIZE, MIN_FRAME_INFO_SIZE,
13 };
14 use super::Error;
15 use crate::{
16 block::WINDOW_SIZE,
17 sink::{vec_sink_for_decompression, SliceSink},
18 };
19
20 /// A reader for decompressing the LZ4 frame format
21 ///
22 /// This Decoder wraps any other reader that implements `io::Read`.
23 /// Bytes read will be decompressed according to the [LZ4 frame format](
24 /// https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md).
25 ///
26 /// # Example 1
27 /// Deserializing json values out of a compressed file.
28 ///
29 /// ```no_run
30 /// let compressed_input = std::fs::File::open("datafile").unwrap();
31 /// let mut decompressed_input = lz4_flex::frame::FrameDecoder::new(compressed_input);
32 /// let json: serde_json::Value = serde_json::from_reader(decompressed_input).unwrap();
33 /// ```
34 ///
35 /// # Example
36 /// Deserializing multiple json values out of a compressed file
37 ///
38 /// ```no_run
39 /// let compressed_input = std::fs::File::open("datafile").unwrap();
40 /// let mut decompressed_input = lz4_flex::frame::FrameDecoder::new(compressed_input);
41 /// loop {
42 /// match serde_json::from_reader::<_, serde_json::Value>(&mut decompressed_input) {
43 /// Ok(json) => { println!("json {:?}", json); }
44 /// Err(e) if e.is_eof() => break,
45 /// Err(e) => panic!("{}", e),
46 /// }
47 /// }
48 /// ```
49 pub struct FrameDecoder<R: io::Read> {
50 /// The underlying reader.
51 r: R,
52 /// The FrameInfo of the frame currently being decoded.
53 /// It starts as `None` and is filled with the FrameInfo is read from the input.
54 /// It's reset to `None` once the frame EndMarker is read from the input.
55 current_frame_info: Option<FrameInfo>,
56 /// Xxhash32 used when content checksum is enabled.
57 content_hasher: XxHash32,
58 /// Total length of decompressed output for the current frame.
59 content_len: u64,
60 /// The compressed bytes buffer, taken from the underlying reader.
61 src: Vec<u8>,
62 /// The decompressed bytes buffer. Bytes are decompressed from src to dst
63 /// before being passed back to the caller.
64 dst: Vec<u8>,
65 /// Index into dst and length: starting point of bytes previously output
66 /// that are still part of the decompressor window.
67 ext_dict_offset: usize,
68 ext_dict_len: usize,
69 /// Index into dst: starting point of bytes not yet read by caller.
70 dst_start: usize,
71 /// Index into dst: ending point of bytes not yet read by caller.
72 dst_end: usize,
73 }
74
75 impl<R: io::Read> FrameDecoder<R> {
76 /// Creates a new Decoder for the specified reader.
new(rdr: R) -> FrameDecoder<R>77 pub fn new(rdr: R) -> FrameDecoder<R> {
78 FrameDecoder {
79 r: rdr,
80 src: Default::default(),
81 dst: Default::default(),
82 ext_dict_offset: 0,
83 ext_dict_len: 0,
84 dst_start: 0,
85 dst_end: 0,
86 current_frame_info: None,
87 content_hasher: XxHash32::with_seed(0),
88 content_len: 0,
89 }
90 }
91
92 /// Gets a reference to the underlying reader in this decoder.
get_ref(&self) -> &R93 pub fn get_ref(&self) -> &R {
94 &self.r
95 }
96
97 /// Gets a mutable reference to the underlying reader in this decoder.
98 ///
99 /// Note that mutation of the stream may result in surprising results if
100 /// this decoder is continued to be used.
get_mut(&mut self) -> &mut R101 pub fn get_mut(&mut self) -> &mut R {
102 &mut self.r
103 }
104
105 /// Consumes the FrameDecoder and returns the underlying reader.
into_inner(self) -> R106 pub fn into_inner(self) -> R {
107 self.r
108 }
109
read_frame_info(&mut self) -> Result<usize, io::Error>110 fn read_frame_info(&mut self) -> Result<usize, io::Error> {
111 let mut buffer = [0u8; MAX_FRAME_INFO_SIZE];
112
113 match self.r.read(&mut buffer[..MAGIC_NUMBER_SIZE])? {
114 0 => return Ok(0),
115 MAGIC_NUMBER_SIZE => (),
116 read => self.r.read_exact(&mut buffer[read..MAGIC_NUMBER_SIZE])?,
117 }
118
119 if u32::from_le_bytes(buffer[0..MAGIC_NUMBER_SIZE].try_into().unwrap())
120 != LZ4F_LEGACY_MAGIC_NUMBER
121 {
122 match self
123 .r
124 .read(&mut buffer[MAGIC_NUMBER_SIZE..MIN_FRAME_INFO_SIZE])?
125 {
126 0 => return Ok(0),
127 MIN_FRAME_INFO_SIZE => (),
128 read => self
129 .r
130 .read_exact(&mut buffer[MAGIC_NUMBER_SIZE + read..MIN_FRAME_INFO_SIZE])?,
131 }
132 }
133 let required = FrameInfo::read_size(&buffer[..MIN_FRAME_INFO_SIZE])?;
134 if required != MIN_FRAME_INFO_SIZE && required != MAGIC_NUMBER_SIZE {
135 self.r
136 .read_exact(&mut buffer[MIN_FRAME_INFO_SIZE..required])?;
137 }
138
139 let frame_info = FrameInfo::read(&buffer[..required])?;
140 if frame_info.dict_id.is_some() {
141 // Unsupported right now so it must be None
142 return Err(Error::DictionaryNotSupported.into());
143 }
144
145 let max_block_size = frame_info.block_size.get_size();
146 let dst_size = if frame_info.block_mode == BlockMode::Linked {
147 // In linked mode we consume the output (bumping dst_start) but leave the
148 // beginning of dst to be used as a prefix in subsequent blocks.
149 // That is at least until we have at least `max_block_size + WINDOW_SIZE`
150 // bytes in dst, then we setup an ext_dict with the last WINDOW_SIZE bytes
151 // and the output goes to the beginning of dst again.
152 // Since we always want to be able to write a full block (up to max_block_size)
153 // we need a buffer with at least `max_block_size * 2 + WINDOW_SIZE` bytes.
154 max_block_size * 2 + WINDOW_SIZE
155 } else {
156 max_block_size
157 };
158 self.src.clear();
159 self.dst.clear();
160 self.src.reserve_exact(max_block_size);
161 self.dst.reserve_exact(dst_size);
162 self.current_frame_info = Some(frame_info);
163 self.content_hasher = XxHash32::with_seed(0);
164 self.content_len = 0;
165 self.ext_dict_len = 0;
166 self.dst_start = 0;
167 self.dst_end = 0;
168 Ok(required)
169 }
170
171 #[inline]
read_checksum(r: &mut R) -> Result<u32, io::Error>172 fn read_checksum(r: &mut R) -> Result<u32, io::Error> {
173 let mut checksum_buffer = [0u8; size_of::<u32>()];
174 r.read_exact(&mut checksum_buffer[..])?;
175 let checksum = u32::from_le_bytes(checksum_buffer);
176 Ok(checksum)
177 }
178
179 #[inline]
check_block_checksum(data: &[u8], expected_checksum: u32) -> Result<(), io::Error>180 fn check_block_checksum(data: &[u8], expected_checksum: u32) -> Result<(), io::Error> {
181 let mut block_hasher = XxHash32::with_seed(0);
182 block_hasher.write(data);
183 let calc_checksum = block_hasher.finish() as u32;
184 if calc_checksum != expected_checksum {
185 return Err(Error::BlockChecksumError.into());
186 }
187 Ok(())
188 }
189
read_block(&mut self) -> io::Result<usize>190 fn read_block(&mut self) -> io::Result<usize> {
191 debug_assert_eq!(self.dst_start, self.dst_end);
192 let frame_info = self.current_frame_info.as_ref().unwrap();
193
194 // Adjust dst buffer offsets to decompress the next block
195 let max_block_size = frame_info.block_size.get_size();
196 if frame_info.block_mode == BlockMode::Linked {
197 // In linked mode we consume the output (bumping dst_start) but leave the
198 // beginning of dst to be used as a prefix in subsequent blocks.
199 // That is at least until we have at least `max_block_size + WINDOW_SIZE`
200 // bytes in dst, then we setup an ext_dict with the last WINDOW_SIZE bytes
201 // and the output goes to the beginning of dst again.
202 debug_assert_eq!(self.dst.capacity(), max_block_size * 2 + WINDOW_SIZE);
203 if self.dst_start + max_block_size > self.dst.capacity() {
204 // Output might not fit in the buffer.
205 // The ext_dict will become the last WINDOW_SIZE bytes
206 debug_assert!(self.dst_start >= max_block_size + WINDOW_SIZE);
207 self.ext_dict_offset = self.dst_start - WINDOW_SIZE;
208 self.ext_dict_len = WINDOW_SIZE;
209 // Output goes in the beginning of the buffer again.
210 self.dst_start = 0;
211 self.dst_end = 0;
212 } else if self.dst_start + self.ext_dict_len > WINDOW_SIZE {
213 // There's more than WINDOW_SIZE bytes of lookback adding the prefix and ext_dict.
214 // Since we have a limited buffer we must shrink ext_dict in favor of the prefix,
215 // so that we can fit up to max_block_size bytes between dst_start and ext_dict
216 // start.
217 let delta = self
218 .ext_dict_len
219 .min(self.dst_start + self.ext_dict_len - WINDOW_SIZE);
220 self.ext_dict_offset += delta;
221 self.ext_dict_len -= delta;
222 debug_assert!(self.dst_start + self.ext_dict_len >= WINDOW_SIZE)
223 }
224 } else {
225 debug_assert_eq!(self.ext_dict_len, 0);
226 debug_assert_eq!(self.dst.capacity(), max_block_size);
227 self.dst_start = 0;
228 self.dst_end = 0;
229 }
230
231 // Read and decompress block
232 let block_info = {
233 let mut buffer = [0u8; 4];
234 if let Err(err) = self.r.read_exact(&mut buffer) {
235 if err.kind() == ErrorKind::UnexpectedEof {
236 return Ok(0);
237 } else {
238 return Err(err);
239 }
240 }
241 BlockInfo::read(&buffer)?
242 };
243 match block_info {
244 BlockInfo::Uncompressed(len) => {
245 let len = len as usize;
246 if len > max_block_size {
247 return Err(Error::BlockTooBig.into());
248 }
249 // TODO: Attempt to avoid initialization of read buffer when
250 // https://github.com/rust-lang/rust/issues/42788 stabilizes
251 self.r.read_exact(vec_resize_and_get_mut(
252 &mut self.dst,
253 self.dst_start,
254 self.dst_start + len,
255 ))?;
256 if frame_info.block_checksums {
257 let expected_checksum = Self::read_checksum(&mut self.r)?;
258 Self::check_block_checksum(
259 &self.dst[self.dst_start..self.dst_start + len],
260 expected_checksum,
261 )?;
262 }
263
264 self.dst_end += len;
265 self.content_len += len as u64;
266 }
267 BlockInfo::Compressed(len) => {
268 let len = len as usize;
269 if len > max_block_size {
270 return Err(Error::BlockTooBig.into());
271 }
272 // TODO: Attempt to avoid initialization of read buffer when
273 // https://github.com/rust-lang/rust/issues/42788 stabilizes
274 self.r
275 .read_exact(vec_resize_and_get_mut(&mut self.src, 0, len))?;
276 if frame_info.block_checksums {
277 let expected_checksum = Self::read_checksum(&mut self.r)?;
278 Self::check_block_checksum(&self.src[..len], expected_checksum)?;
279 }
280
281 let with_dict_mode =
282 frame_info.block_mode == BlockMode::Linked && self.ext_dict_len != 0;
283 let decomp_size = if with_dict_mode {
284 debug_assert!(self.dst_start + max_block_size <= self.ext_dict_offset);
285 let (head, tail) = self.dst.split_at_mut(self.ext_dict_offset);
286 let ext_dict = &tail[..self.ext_dict_len];
287
288 debug_assert!(head.len() - self.dst_start >= max_block_size);
289 crate::block::decompress::decompress_internal::<true, _>(
290 &self.src[..len],
291 &mut SliceSink::new(head, self.dst_start),
292 ext_dict,
293 )
294 } else {
295 // Independent blocks OR linked blocks with only prefix data
296 debug_assert!(self.dst.capacity() - self.dst_start >= max_block_size);
297 crate::block::decompress::decompress_internal::<false, _>(
298 &self.src[..len],
299 &mut vec_sink_for_decompression(
300 &mut self.dst,
301 0,
302 self.dst_start,
303 self.dst_start + max_block_size,
304 ),
305 b"",
306 )
307 }
308 .map_err(Error::DecompressionError)?;
309
310 self.dst_end += decomp_size;
311 self.content_len += decomp_size as u64;
312 }
313
314 BlockInfo::EndMark => {
315 if let Some(expected) = frame_info.content_size {
316 if self.content_len != expected {
317 return Err(Error::ContentLengthError {
318 expected,
319 actual: self.content_len,
320 }
321 .into());
322 }
323 }
324 if frame_info.content_checksum {
325 let expected_checksum = Self::read_checksum(&mut self.r)?;
326 let calc_checksum = self.content_hasher.finish() as u32;
327 if calc_checksum != expected_checksum {
328 return Err(Error::ContentChecksumError.into());
329 }
330 }
331 self.current_frame_info = None;
332 return Ok(0);
333 }
334 }
335
336 // Content checksum, if applicable
337 if frame_info.content_checksum {
338 self.content_hasher
339 .write(&self.dst[self.dst_start..self.dst_end]);
340 }
341
342 Ok(self.dst_end - self.dst_start)
343 }
344
read_more(&mut self) -> io::Result<usize>345 fn read_more(&mut self) -> io::Result<usize> {
346 if self.current_frame_info.is_none() && self.read_frame_info()? == 0 {
347 return Ok(0);
348 }
349 self.read_block()
350 }
351 }
352
353 impl<R: io::Read> io::Read for FrameDecoder<R> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>354 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
355 loop {
356 // Fill read buffer if there's uncompressed data left
357 if self.dst_start < self.dst_end {
358 let read_len = std::cmp::min(self.dst_end - self.dst_start, buf.len());
359 let dst_read_end = self.dst_start + read_len;
360 buf[..read_len].copy_from_slice(&self.dst[self.dst_start..dst_read_end]);
361 self.dst_start = dst_read_end;
362 return Ok(read_len);
363 }
364 if self.read_more()? == 0 {
365 return Ok(0);
366 }
367 }
368 }
369
read_to_string(&mut self, buf: &mut String) -> io::Result<usize>370 fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
371 let mut written = 0;
372 loop {
373 match self.fill_buf() {
374 Ok(b) if b.is_empty() => return Ok(written),
375 Ok(b) => {
376 let s = std::str::from_utf8(b).map_err(|_| {
377 io::Error::new(
378 io::ErrorKind::InvalidData,
379 "stream did not contain valid UTF-8",
380 )
381 })?;
382 buf.push_str(s);
383 let len = s.len();
384 self.consume(len);
385 written += len;
386 }
387 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
388 Err(e) => return Err(e),
389 }
390 }
391 }
392
read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize>393 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
394 let mut written = 0;
395 loop {
396 match self.fill_buf() {
397 Ok(b) if b.is_empty() => return Ok(written),
398 Ok(b) => {
399 buf.extend_from_slice(b);
400 let len = b.len();
401 self.consume(len);
402 written += len;
403 }
404 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
405 Err(e) => return Err(e),
406 }
407 }
408 }
409 }
410
411 impl<R: io::Read> io::BufRead for FrameDecoder<R> {
fill_buf(&mut self) -> io::Result<&[u8]>412 fn fill_buf(&mut self) -> io::Result<&[u8]> {
413 if self.dst_start == self.dst_end {
414 self.read_more()?;
415 }
416 Ok(&self.dst[self.dst_start..self.dst_end])
417 }
418
consume(&mut self, amt: usize)419 fn consume(&mut self, amt: usize) {
420 assert!(amt <= self.dst_end - self.dst_start);
421 self.dst_start += amt;
422 }
423 }
424
425 impl<R: fmt::Debug + io::Read> fmt::Debug for FrameDecoder<R> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result426 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
427 f.debug_struct("FrameDecoder")
428 .field("r", &self.r)
429 .field("content_hasher", &self.content_hasher)
430 .field("content_len", &self.content_len)
431 .field("src", &"[...]")
432 .field("dst", &"[...]")
433 .field("dst_start", &self.dst_start)
434 .field("dst_end", &self.dst_end)
435 .field("ext_dict_offset", &self.ext_dict_offset)
436 .field("ext_dict_len", &self.ext_dict_len)
437 .field("current_frame_info", &self.current_frame_info)
438 .finish()
439 }
440 }
441
442 /// Similar to `v.get_mut(start..end) but will adjust the len if needed.
443 #[inline]
vec_resize_and_get_mut(v: &mut Vec<u8>, start: usize, end: usize) -> &mut [u8]444 fn vec_resize_and_get_mut(v: &mut Vec<u8>, start: usize, end: usize) -> &mut [u8] {
445 if end > v.len() {
446 v.resize(end, 0)
447 }
448 &mut v[start..end]
449 }
450