1 use std::{
2     fmt,
3     hash::Hasher,
4     io::{self, Write},
5 };
6 use twox_hash::XxHash32;
7 
8 use crate::{
9     block::{
10         compress::compress_internal,
11         hashtable::{HashTable, HashTable4K},
12     },
13     sink::vec_sink_for_compression,
14 };
15 
16 use super::Error;
17 use super::{
18     header::{BlockInfo, BlockMode, FrameInfo, BLOCK_INFO_SIZE, MAX_FRAME_INFO_SIZE},
19     BlockSize,
20 };
21 use crate::block::WINDOW_SIZE;
22 
23 /// A writer for compressing a LZ4 stream.
24 ///
25 /// This `FrameEncoder` wraps any other writer that implements `io::Write`.
26 /// Bytes written to this writer are compressed using the [LZ4 frame
27 /// format](https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md).
28 ///
29 /// Writes are buffered automatically, so there's no need to wrap the given
30 /// writer in a `std::io::BufWriter`.
31 ///
32 /// To ensure a well formed stream the encoder must be finalized by calling
33 /// either the [`finish()`], [`try_finish()`], or [`auto_finish()`] methods.
34 ///
35 /// [`finish()`]: Self::finish
36 /// [`try_finish()`]: Self::try_finish
37 /// [`auto_finish()`]: Self::auto_finish
38 ///
39 /// # Example 1
40 /// Serializing json values into a compressed file.
41 ///
42 /// ```no_run
43 /// let compressed_file = std::fs::File::create("datafile").unwrap();
44 /// let mut compressor = lz4_flex::frame::FrameEncoder::new(compressed_file);
45 /// serde_json::to_writer(&mut compressor, &serde_json::json!({ "an": "object" })).unwrap();
46 /// compressor.finish().unwrap();
47 /// ```
48 ///
49 /// # Example 2
50 /// Serializing multiple json values into a compressed file using linked blocks.
51 ///
52 /// ```no_run
53 /// let compressed_file = std::fs::File::create("datafile").unwrap();
54 /// let mut frame_info = lz4_flex::frame::FrameInfo::new();
55 /// frame_info.block_mode = lz4_flex::frame::BlockMode::Linked;
56 /// let mut compressor = lz4_flex::frame::FrameEncoder::with_frame_info(frame_info, compressed_file);
57 /// for i in 0..10u64 {
58 ///     serde_json::to_writer(&mut compressor, &serde_json::json!({ "i": i })).unwrap();
59 /// }
60 /// compressor.finish().unwrap();
61 /// ```
62 pub struct FrameEncoder<W: io::Write> {
63     /// Our buffer of uncompressed bytes.
64     src: Vec<u8>,
65     /// Index into src: starting point of bytes not yet compressed
66     src_start: usize,
67     /// Index into src: end point of bytes not not yet compressed
68     src_end: usize,
69     /// Index into src: starting point of external dictionary (applicable in Linked block mode)
70     ext_dict_offset: usize,
71     /// Length of external dictionary
72     ext_dict_len: usize,
73     /// Counter of bytes already compressed to the compression_table
74     /// _Not_ the same as `content_len` as this is reset every to 2GB.
75     src_stream_offset: usize,
76     /// Encoder table
77     compression_table: HashTable4K,
78     /// The underlying writer.
79     w: W,
80     /// Xxhash32 used when content checksum is enabled.
81     content_hasher: XxHash32,
82     /// Number of bytes compressed
83     content_len: u64,
84     /// The compressed bytes buffer. Bytes are compressed from src (usually)
85     /// to dst before being written to w.
86     dst: Vec<u8>,
87     /// Whether we have an open frame in the output.
88     is_frame_open: bool,
89     /// Whether we have an frame closed in the output.
90     data_to_frame_written: bool,
91     /// The frame information to be used in this encoder.
92     frame_info: FrameInfo,
93 }
94 
95 impl<W: io::Write> FrameEncoder<W> {
init(&mut self)96     fn init(&mut self) {
97         let max_block_size = self.frame_info.block_size.get_size();
98         let src_size = if self.frame_info.block_mode == BlockMode::Linked {
99             // In linked mode we consume the input (bumping src_start) but leave the
100             // beginning of src to be used as a prefix in subsequent blocks.
101             // That is at least until we have at least `max_block_size + WINDOW_SIZE`
102             // bytes in src, then we setup an ext_dict with the last WINDOW_SIZE bytes
103             // and the input goes to the beginning of src again.
104             // Since we always want to be able to write a full block (up to max_block_size)
105             // we need a buffer with at least `max_block_size * 2 + WINDOW_SIZE` bytes.
106             max_block_size * 2 + WINDOW_SIZE
107         } else {
108             max_block_size
109         };
110         // Since this method is called potentially multiple times, don't reserve _additional_
111         // capacity if not required.
112         self.src
113             .reserve(src_size.saturating_sub(self.src.capacity()));
114         self.dst.reserve(
115             crate::block::compress::get_maximum_output_size(max_block_size)
116                 .saturating_sub(self.dst.capacity()),
117         );
118     }
119 
120     /// Returns a wrapper around `self` that will finish the stream on drop.
121     ///
122     /// # Note
123     /// Errors on drop get silently ignored. If you want to handle errors then use [`finish()`] or
124     /// [`try_finish()`] instead.
125     ///
126     /// [`finish()`]: Self::finish
127     /// [`try_finish()`]: Self::try_finish
auto_finish(self) -> AutoFinishEncoder<W>128     pub fn auto_finish(self) -> AutoFinishEncoder<W> {
129         AutoFinishEncoder {
130             encoder: Some(self),
131         }
132     }
133 
134     /// Creates a new Encoder with the specified FrameInfo.
with_frame_info(frame_info: FrameInfo, wtr: W) -> Self135     pub fn with_frame_info(frame_info: FrameInfo, wtr: W) -> Self {
136         FrameEncoder {
137             src: Vec::new(),
138             w: wtr,
139             // 16 KB hash table for matches, same as the reference implementation.
140             compression_table: HashTable4K::new(),
141             content_hasher: XxHash32::with_seed(0),
142             content_len: 0,
143             dst: Vec::new(),
144             is_frame_open: false,
145             data_to_frame_written: false,
146             frame_info,
147             src_start: 0,
148             src_end: 0,
149             ext_dict_offset: 0,
150             ext_dict_len: 0,
151             src_stream_offset: 0,
152         }
153     }
154 
155     /// Creates a new Encoder with the default settings.
new(wtr: W) -> Self156     pub fn new(wtr: W) -> Self {
157         Self::with_frame_info(Default::default(), wtr)
158     }
159 
160     /// The frame information used by this Encoder.
frame_info(&mut self) -> &FrameInfo161     pub fn frame_info(&mut self) -> &FrameInfo {
162         &self.frame_info
163     }
164 
165     /// Consumes this encoder, flushing internal buffer and writing stream terminator.
finish(mut self) -> Result<W, Error>166     pub fn finish(mut self) -> Result<W, Error> {
167         self.try_finish()?;
168         Ok(self.w)
169     }
170 
171     /// Attempt to finish this output stream, flushing internal buffer and writing stream
172     /// terminator.
try_finish(&mut self) -> Result<(), Error>173     pub fn try_finish(&mut self) -> Result<(), Error> {
174         match self.flush() {
175             Ok(()) => {
176                 // Empty input special case
177                 // https://github.com/ouch-org/ouch/pull/163#discussion_r1108965151
178                 if !self.is_frame_open && !self.data_to_frame_written {
179                     self.begin_frame(0)?;
180                 }
181                 self.end_frame()?;
182                 self.data_to_frame_written = true;
183                 Ok(())
184             }
185             Err(err) => Err(err.into()),
186         }
187     }
188 
189     /// Returns the underlying writer _without_ flushing the stream.
190     /// This may leave the output in an unfinished state.
into_inner(self) -> W191     pub fn into_inner(self) -> W {
192         self.w
193     }
194 
195     /// Gets a reference to the underlying writer in this encoder.
get_ref(&self) -> &W196     pub fn get_ref(&self) -> &W {
197         &self.w
198     }
199 
200     /// Gets a reference to the underlying writer in this encoder.
201     ///
202     /// Note that mutating the output/input state of the stream may corrupt
203     /// this encoder, so care must be taken when using this method.
get_mut(&mut self) -> &mut W204     pub fn get_mut(&mut self) -> &mut W {
205         &mut self.w
206     }
207 
208     /// Closes the frame by writing the end marker.
end_frame(&mut self) -> Result<(), Error>209     fn end_frame(&mut self) -> Result<(), Error> {
210         debug_assert!(self.is_frame_open);
211         self.is_frame_open = false;
212         if let Some(expected) = self.frame_info.content_size {
213             if expected != self.content_len {
214                 return Err(Error::ContentLengthError {
215                     expected,
216                     actual: self.content_len,
217                 });
218             }
219         }
220 
221         let mut block_info_buffer = [0u8; BLOCK_INFO_SIZE];
222         BlockInfo::EndMark.write(&mut block_info_buffer[..])?;
223         self.w.write_all(&block_info_buffer[..])?;
224         if self.frame_info.content_checksum {
225             let content_checksum = self.content_hasher.finish() as u32;
226             self.w.write_all(&content_checksum.to_le_bytes())?;
227         }
228 
229         Ok(())
230     }
231 
232     /// Begin the frame by writing the frame header.
233     /// It'll also setup the encoder for compressing blocks for the the new frame.
begin_frame(&mut self, buf_len: usize) -> io::Result<()>234     fn begin_frame(&mut self, buf_len: usize) -> io::Result<()> {
235         self.is_frame_open = true;
236         if self.frame_info.block_size == BlockSize::Auto {
237             self.frame_info.block_size = BlockSize::from_buf_length(buf_len);
238         }
239         self.init();
240         let mut frame_info_buffer = [0u8; MAX_FRAME_INFO_SIZE];
241         let size = self.frame_info.write(&mut frame_info_buffer)?;
242         self.w.write_all(&frame_info_buffer[..size])?;
243 
244         if self.content_len != 0 {
245             // This is the second or later frame for this Encoder,
246             // reset compressor state for the new frame.
247             self.content_len = 0;
248             self.src_stream_offset = 0;
249             self.src.clear();
250             self.src_start = 0;
251             self.src_end = 0;
252             self.ext_dict_len = 0;
253             self.content_hasher = XxHash32::with_seed(0);
254             self.compression_table.clear();
255         }
256         Ok(())
257     }
258 
259     /// Consumes the src contents between src_start and src_end,
260     /// which shouldn't exceed the max block size.
write_block(&mut self) -> io::Result<()>261     fn write_block(&mut self) -> io::Result<()> {
262         debug_assert!(self.is_frame_open);
263         let max_block_size = self.frame_info.block_size.get_size();
264         debug_assert!(self.src_end - self.src_start <= max_block_size);
265 
266         // Reposition the compression table if we're anywhere near an overflowing hazard
267         if self.src_stream_offset + max_block_size + WINDOW_SIZE >= u32::MAX as usize / 2 {
268             self.compression_table
269                 .reposition((self.src_stream_offset - self.ext_dict_len) as _);
270             self.src_stream_offset = self.ext_dict_len;
271         }
272 
273         // input to the compressor, which may include a prefix when blocks are linked
274         let input = &self.src[..self.src_end];
275         // the contents of the block are between src_start and src_end
276         let src = &input[self.src_start..];
277 
278         let dst_required_size = crate::block::compress::get_maximum_output_size(src.len());
279 
280         let compress_result = if self.ext_dict_len != 0 {
281             debug_assert_eq!(self.frame_info.block_mode, BlockMode::Linked);
282             compress_internal::<_, true, _>(
283                 input,
284                 self.src_start,
285                 &mut vec_sink_for_compression(&mut self.dst, 0, 0, dst_required_size),
286                 &mut self.compression_table,
287                 &self.src[self.ext_dict_offset..self.ext_dict_offset + self.ext_dict_len],
288                 self.src_stream_offset,
289             )
290         } else {
291             compress_internal::<_, false, _>(
292                 input,
293                 self.src_start,
294                 &mut vec_sink_for_compression(&mut self.dst, 0, 0, dst_required_size),
295                 &mut self.compression_table,
296                 b"",
297                 self.src_stream_offset,
298             )
299         };
300 
301         let (block_info, block_data) = match compress_result.map_err(Error::CompressionError)? {
302             comp_len if comp_len < src.len() => {
303                 (BlockInfo::Compressed(comp_len as _), &self.dst[..comp_len])
304             }
305             _ => (BlockInfo::Uncompressed(src.len() as _), src),
306         };
307 
308         // Write the (un)compressed block to the writer and the block checksum (if applicable).
309         let mut block_info_buffer = [0u8; BLOCK_INFO_SIZE];
310         block_info.write(&mut block_info_buffer[..])?;
311         self.w.write_all(&block_info_buffer[..])?;
312         self.w.write_all(block_data)?;
313         if self.frame_info.block_checksums {
314             let mut block_hasher = XxHash32::with_seed(0);
315             block_hasher.write(block_data);
316             let block_checksum = block_hasher.finish() as u32;
317             self.w.write_all(&block_checksum.to_le_bytes())?;
318         }
319 
320         // Content checksum, if applicable
321         if self.frame_info.content_checksum {
322             self.content_hasher.write(src);
323         }
324 
325         // Buffer and offsets maintenance
326         self.content_len += src.len() as u64;
327         self.src_start += src.len();
328         debug_assert_eq!(self.src_start, self.src_end);
329         if self.frame_info.block_mode == BlockMode::Linked {
330             // In linked mode we consume the input (bumping src_start) but leave the
331             // beginning of src to be used as a prefix in subsequent blocks.
332             // That is at least until we have at least `max_block_size + WINDOW_SIZE`
333             // bytes in src, then we setup an ext_dict with the last WINDOW_SIZE bytes
334             // and the input goes to the beginning of src again.
335             debug_assert_eq!(self.src.capacity(), max_block_size * 2 + WINDOW_SIZE);
336             if self.src_start >= max_block_size + WINDOW_SIZE {
337                 // The ext_dict will become the last WINDOW_SIZE bytes
338                 self.ext_dict_offset = self.src_end - WINDOW_SIZE;
339                 self.ext_dict_len = WINDOW_SIZE;
340                 // Input goes in the beginning of the buffer again.
341                 self.src_stream_offset += self.src_end;
342                 self.src_start = 0;
343                 self.src_end = 0;
344             } else if self.src_start + self.ext_dict_len > WINDOW_SIZE {
345                 // There's more than WINDOW_SIZE bytes of lookback adding the prefix and ext_dict.
346                 // Since we have a limited buffer we must shrink ext_dict in favor of the prefix,
347                 // so that we can fit up to max_block_size bytes between dst_start and ext_dict
348                 // start.
349                 let delta = self
350                     .ext_dict_len
351                     .min(self.src_start + self.ext_dict_len - WINDOW_SIZE);
352                 self.ext_dict_offset += delta;
353                 self.ext_dict_len -= delta;
354                 debug_assert!(self.src_start + self.ext_dict_len >= WINDOW_SIZE)
355             }
356             debug_assert!(
357                 self.ext_dict_len == 0 || self.src_start + max_block_size <= self.ext_dict_offset
358             );
359         } else {
360             // In independent block mode we consume the entire src buffer
361             // which is sized equal to the frame max_block_size.
362             debug_assert_eq!(self.ext_dict_len, 0);
363             debug_assert_eq!(self.src.capacity(), max_block_size);
364             self.src_start = 0;
365             self.src_end = 0;
366             // Advance stream offset so we don't have to reset the match dict
367             // for the next block.
368             self.src_stream_offset += src.len();
369         }
370         debug_assert!(self.src_start <= self.src_end);
371         debug_assert!(self.src_start + max_block_size <= self.src.capacity());
372         Ok(())
373     }
374 }
375 
376 impl<W: io::Write> io::Write for FrameEncoder<W> {
write(&mut self, mut buf: &[u8]) -> io::Result<usize>377     fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
378         if !self.is_frame_open && !buf.is_empty() {
379             self.begin_frame(buf.len())?;
380         }
381         let buf_len = buf.len();
382         while !buf.is_empty() {
383             let src_filled = self.src_end - self.src_start;
384             let max_fill_len = self.frame_info.block_size.get_size() - src_filled;
385             if max_fill_len == 0 {
386                 // make space by writing next block
387                 self.write_block()?;
388                 debug_assert_eq!(self.src_end, self.src_start);
389                 continue;
390             }
391 
392             let fill_len = max_fill_len.min(buf.len());
393             vec_copy_overwriting(&mut self.src, self.src_end, &buf[..fill_len]);
394             buf = &buf[fill_len..];
395             self.src_end += fill_len;
396         }
397         Ok(buf_len)
398     }
399 
flush(&mut self) -> io::Result<()>400     fn flush(&mut self) -> io::Result<()> {
401         if self.src_start != self.src_end {
402             self.write_block()?;
403         }
404         Ok(())
405     }
406 }
407 
408 /// A wrapper around an [`FrameEncoder<W>`] that finishes the stream on drop.
409 ///
410 /// This can be created by the [`auto_finish()`] method on the [`FrameEncoder<W>`].
411 ///
412 /// # Note
413 /// Errors on drop get silently ignored. If you want to handle errors then use [`finish()`] or
414 /// [`try_finish()`] instead.
415 ///
416 /// [`finish()`]: FrameEncoder::finish
417 /// [`try_finish()`]: FrameEncoder::try_finish
418 /// [`auto_finish()`]: FrameEncoder::auto_finish
419 pub struct AutoFinishEncoder<W: Write> {
420     // We wrap this in an option to take it during drop.
421     encoder: Option<FrameEncoder<W>>,
422 }
423 
424 impl<W: io::Write> Drop for AutoFinishEncoder<W> {
drop(&mut self)425     fn drop(&mut self) {
426         if let Some(mut encoder) = self.encoder.take() {
427             let _ = encoder.try_finish();
428         }
429     }
430 }
431 
432 impl<W: Write> Write for AutoFinishEncoder<W> {
write(&mut self, buf: &[u8]) -> io::Result<usize>433     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
434         self.encoder.as_mut().unwrap().write(buf)
435     }
436 
flush(&mut self) -> io::Result<()>437     fn flush(&mut self) -> io::Result<()> {
438         self.encoder.as_mut().unwrap().flush()
439     }
440 }
441 
442 impl<W: fmt::Debug + io::Write> fmt::Debug for FrameEncoder<W> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result443     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
444         f.debug_struct("FrameEncoder")
445             .field("w", &self.w)
446             .field("frame_info", &self.frame_info)
447             .field("is_frame_open", &self.is_frame_open)
448             .field("content_hasher", &self.content_hasher)
449             .field("content_len", &self.content_len)
450             .field("dst", &"[...]")
451             .field("src", &"[...]")
452             .field("src_start", &self.src_start)
453             .field("src_end", &self.src_end)
454             .field("ext_dict_offset", &self.ext_dict_offset)
455             .field("ext_dict_len", &self.ext_dict_len)
456             .field("src_stream_offset", &self.src_stream_offset)
457             .finish()
458     }
459 }
460 
461 /// Copy `src` into `target` starting from the `start` index, overwriting existing data if any.
462 #[inline]
vec_copy_overwriting(target: &mut Vec<u8>, target_start: usize, src: &[u8])463 fn vec_copy_overwriting(target: &mut Vec<u8>, target_start: usize, src: &[u8]) {
464     debug_assert!(target_start + src.len() <= target.capacity());
465 
466     // By combining overwriting (copy_from_slice) and extending (extend_from_slice)
467     // we can fill the ring buffer without initializing it (eg. filling with 0).
468     let overwrite_len = (target.len() - target_start).min(src.len());
469     target[target_start..target_start + overwrite_len].copy_from_slice(&src[..overwrite_len]);
470     target.extend_from_slice(&src[overwrite_len..]);
471 }
472