1*01826a49SYabin Cui /* 2*01826a49SYabin Cui * Copyright (c) Meta Platforms, Inc. and affiliates. 3*01826a49SYabin Cui * All rights reserved. 4*01826a49SYabin Cui * 5*01826a49SYabin Cui * This source code is licensed under both the BSD-style license (found in the 6*01826a49SYabin Cui * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7*01826a49SYabin Cui * in the COPYING file in the root directory of this source tree). 8*01826a49SYabin Cui */ 9*01826a49SYabin Cui #pragma once 10*01826a49SYabin Cui 11*01826a49SYabin Cui #include "ErrorHolder.h" 12*01826a49SYabin Cui #include "Logging.h" 13*01826a49SYabin Cui #include "Options.h" 14*01826a49SYabin Cui #include "utils/Buffer.h" 15*01826a49SYabin Cui #include "utils/Range.h" 16*01826a49SYabin Cui #include "utils/ResourcePool.h" 17*01826a49SYabin Cui #include "utils/ThreadPool.h" 18*01826a49SYabin Cui #include "utils/WorkQueue.h" 19*01826a49SYabin Cui #define ZSTD_STATIC_LINKING_ONLY 20*01826a49SYabin Cui #define ZSTD_DISABLE_DEPRECATE_WARNINGS /* No deprecation warnings, pzstd itself is deprecated 21*01826a49SYabin Cui * and uses deprecated functions 22*01826a49SYabin Cui */ 23*01826a49SYabin Cui #include "zstd.h" 24*01826a49SYabin Cui #undef ZSTD_STATIC_LINKING_ONLY 25*01826a49SYabin Cui 26*01826a49SYabin Cui #include <cstddef> 27*01826a49SYabin Cui #include <cstdint> 28*01826a49SYabin Cui #include <memory> 29*01826a49SYabin Cui 30*01826a49SYabin Cui namespace pzstd { 31*01826a49SYabin Cui /** 32*01826a49SYabin Cui * Runs pzstd with `options` and returns the number of bytes written. 33*01826a49SYabin Cui * An error occurred if `errorHandler.hasError()`. 34*01826a49SYabin Cui * 35*01826a49SYabin Cui * @param options The pzstd options to use for (de)compression 36*01826a49SYabin Cui * @returns 0 upon success and non-zero on failure. 37*01826a49SYabin Cui */ 38*01826a49SYabin Cui int pzstdMain(const Options& options); 39*01826a49SYabin Cui 40*01826a49SYabin Cui class SharedState { 41*01826a49SYabin Cui public: SharedState(const Options & options)42*01826a49SYabin Cui SharedState(const Options& options) : log(options.verbosity) { 43*01826a49SYabin Cui if (!options.decompress) { 44*01826a49SYabin Cui auto parameters = options.determineParameters(); 45*01826a49SYabin Cui cStreamPool.reset(new ResourcePool<ZSTD_CStream>{ 46*01826a49SYabin Cui [this, parameters]() -> ZSTD_CStream* { 47*01826a49SYabin Cui this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream"); 48*01826a49SYabin Cui auto zcs = ZSTD_createCStream(); 49*01826a49SYabin Cui if (zcs) { 50*01826a49SYabin Cui auto err = ZSTD_initCStream_advanced( 51*01826a49SYabin Cui zcs, nullptr, 0, parameters, 0); 52*01826a49SYabin Cui if (ZSTD_isError(err)) { 53*01826a49SYabin Cui ZSTD_freeCStream(zcs); 54*01826a49SYabin Cui return nullptr; 55*01826a49SYabin Cui } 56*01826a49SYabin Cui } 57*01826a49SYabin Cui return zcs; 58*01826a49SYabin Cui }, 59*01826a49SYabin Cui [](ZSTD_CStream *zcs) { 60*01826a49SYabin Cui ZSTD_freeCStream(zcs); 61*01826a49SYabin Cui }}); 62*01826a49SYabin Cui } else { 63*01826a49SYabin Cui dStreamPool.reset(new ResourcePool<ZSTD_DStream>{ 64*01826a49SYabin Cui [this]() -> ZSTD_DStream* { 65*01826a49SYabin Cui this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream"); 66*01826a49SYabin Cui auto zds = ZSTD_createDStream(); 67*01826a49SYabin Cui if (zds) { 68*01826a49SYabin Cui auto err = ZSTD_initDStream(zds); 69*01826a49SYabin Cui if (ZSTD_isError(err)) { 70*01826a49SYabin Cui ZSTD_freeDStream(zds); 71*01826a49SYabin Cui return nullptr; 72*01826a49SYabin Cui } 73*01826a49SYabin Cui } 74*01826a49SYabin Cui return zds; 75*01826a49SYabin Cui }, 76*01826a49SYabin Cui [](ZSTD_DStream *zds) { 77*01826a49SYabin Cui ZSTD_freeDStream(zds); 78*01826a49SYabin Cui }}); 79*01826a49SYabin Cui } 80*01826a49SYabin Cui } 81*01826a49SYabin Cui ~SharedState()82*01826a49SYabin Cui ~SharedState() { 83*01826a49SYabin Cui // The resource pools have references to this, so destroy them first. 84*01826a49SYabin Cui cStreamPool.reset(); 85*01826a49SYabin Cui dStreamPool.reset(); 86*01826a49SYabin Cui } 87*01826a49SYabin Cui 88*01826a49SYabin Cui Logger log; 89*01826a49SYabin Cui ErrorHolder errorHolder; 90*01826a49SYabin Cui std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool; 91*01826a49SYabin Cui std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool; 92*01826a49SYabin Cui }; 93*01826a49SYabin Cui 94*01826a49SYabin Cui /** 95*01826a49SYabin Cui * Streams input from `fd`, breaks input up into chunks, and compresses each 96*01826a49SYabin Cui * chunk independently. Output of each chunk gets streamed to a queue, and 97*01826a49SYabin Cui * the output queues get put into `chunks` in order. 98*01826a49SYabin Cui * 99*01826a49SYabin Cui * @param state The shared state 100*01826a49SYabin Cui * @param chunks Each compression jobs output queue gets `pushed()` here 101*01826a49SYabin Cui * as soon as it is available 102*01826a49SYabin Cui * @param executor The thread pool to run compression jobs in 103*01826a49SYabin Cui * @param fd The input file descriptor 104*01826a49SYabin Cui * @param size The size of the input file if known, 0 otherwise 105*01826a49SYabin Cui * @param numThreads The number of threads in the thread pool 106*01826a49SYabin Cui * @param parameters The zstd parameters to use for compression 107*01826a49SYabin Cui * @returns The number of bytes read from the file 108*01826a49SYabin Cui */ 109*01826a49SYabin Cui std::uint64_t asyncCompressChunks( 110*01826a49SYabin Cui SharedState& state, 111*01826a49SYabin Cui WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, 112*01826a49SYabin Cui ThreadPool& executor, 113*01826a49SYabin Cui FILE* fd, 114*01826a49SYabin Cui std::uintmax_t size, 115*01826a49SYabin Cui std::size_t numThreads, 116*01826a49SYabin Cui ZSTD_parameters parameters); 117*01826a49SYabin Cui 118*01826a49SYabin Cui /** 119*01826a49SYabin Cui * Streams input from `fd`. If pzstd headers are available it breaks the input 120*01826a49SYabin Cui * up into independent frames. It sends each frame to an independent 121*01826a49SYabin Cui * decompression job. Output of each frame gets streamed to a queue, and 122*01826a49SYabin Cui * the output queues get put into `frames` in order. 123*01826a49SYabin Cui * 124*01826a49SYabin Cui * @param state The shared state 125*01826a49SYabin Cui * @param frames Each decompression jobs output queue gets `pushed()` here 126*01826a49SYabin Cui * as soon as it is available 127*01826a49SYabin Cui * @param executor The thread pool to run compression jobs in 128*01826a49SYabin Cui * @param fd The input file descriptor 129*01826a49SYabin Cui * @returns The number of bytes read from the file 130*01826a49SYabin Cui */ 131*01826a49SYabin Cui std::uint64_t asyncDecompressFrames( 132*01826a49SYabin Cui SharedState& state, 133*01826a49SYabin Cui WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, 134*01826a49SYabin Cui ThreadPool& executor, 135*01826a49SYabin Cui FILE* fd); 136*01826a49SYabin Cui 137*01826a49SYabin Cui /** 138*01826a49SYabin Cui * Streams input in from each queue in `outs` in order, and writes the data to 139*01826a49SYabin Cui * `outputFd`. 140*01826a49SYabin Cui * 141*01826a49SYabin Cui * @param state The shared state 142*01826a49SYabin Cui * @param outs A queue of output queues, one for each 143*01826a49SYabin Cui * (de)compression job. 144*01826a49SYabin Cui * @param outputFd The file descriptor to write to 145*01826a49SYabin Cui * @param decompress Are we decompressing? 146*01826a49SYabin Cui * @returns The number of bytes written 147*01826a49SYabin Cui */ 148*01826a49SYabin Cui std::uint64_t writeFile( 149*01826a49SYabin Cui SharedState& state, 150*01826a49SYabin Cui WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, 151*01826a49SYabin Cui FILE* outputFd, 152*01826a49SYabin Cui bool decompress); 153*01826a49SYabin Cui } 154