xref: /aosp_15_r20/external/zstd/contrib/pzstd/Pzstd.h (revision 01826a4963a0d8a59bc3812d29bdf0fb76416722)
1*01826a49SYabin Cui /*
2*01826a49SYabin Cui  * Copyright (c) Meta Platforms, Inc. and affiliates.
3*01826a49SYabin Cui  * All rights reserved.
4*01826a49SYabin Cui  *
5*01826a49SYabin Cui  * This source code is licensed under both the BSD-style license (found in the
6*01826a49SYabin Cui  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7*01826a49SYabin Cui  * in the COPYING file in the root directory of this source tree).
8*01826a49SYabin Cui  */
9*01826a49SYabin Cui #pragma once
10*01826a49SYabin Cui 
11*01826a49SYabin Cui #include "ErrorHolder.h"
12*01826a49SYabin Cui #include "Logging.h"
13*01826a49SYabin Cui #include "Options.h"
14*01826a49SYabin Cui #include "utils/Buffer.h"
15*01826a49SYabin Cui #include "utils/Range.h"
16*01826a49SYabin Cui #include "utils/ResourcePool.h"
17*01826a49SYabin Cui #include "utils/ThreadPool.h"
18*01826a49SYabin Cui #include "utils/WorkQueue.h"
19*01826a49SYabin Cui #define ZSTD_STATIC_LINKING_ONLY
20*01826a49SYabin Cui #define ZSTD_DISABLE_DEPRECATE_WARNINGS /* No deprecation warnings, pzstd itself is deprecated
21*01826a49SYabin Cui                                          * and uses deprecated functions
22*01826a49SYabin Cui                                          */
23*01826a49SYabin Cui #include "zstd.h"
24*01826a49SYabin Cui #undef ZSTD_STATIC_LINKING_ONLY
25*01826a49SYabin Cui 
26*01826a49SYabin Cui #include <cstddef>
27*01826a49SYabin Cui #include <cstdint>
28*01826a49SYabin Cui #include <memory>
29*01826a49SYabin Cui 
30*01826a49SYabin Cui namespace pzstd {
31*01826a49SYabin Cui /**
32*01826a49SYabin Cui  * Runs pzstd with `options` and returns the number of bytes written.
33*01826a49SYabin Cui  * An error occurred if `errorHandler.hasError()`.
34*01826a49SYabin Cui  *
35*01826a49SYabin Cui  * @param options      The pzstd options to use for (de)compression
36*01826a49SYabin Cui  * @returns            0 upon success and non-zero on failure.
37*01826a49SYabin Cui  */
38*01826a49SYabin Cui int pzstdMain(const Options& options);
39*01826a49SYabin Cui 
40*01826a49SYabin Cui class SharedState {
41*01826a49SYabin Cui  public:
SharedState(const Options & options)42*01826a49SYabin Cui   SharedState(const Options& options) : log(options.verbosity) {
43*01826a49SYabin Cui     if (!options.decompress) {
44*01826a49SYabin Cui       auto parameters = options.determineParameters();
45*01826a49SYabin Cui       cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
46*01826a49SYabin Cui           [this, parameters]() -> ZSTD_CStream* {
47*01826a49SYabin Cui             this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream");
48*01826a49SYabin Cui             auto zcs = ZSTD_createCStream();
49*01826a49SYabin Cui             if (zcs) {
50*01826a49SYabin Cui               auto err = ZSTD_initCStream_advanced(
51*01826a49SYabin Cui                   zcs, nullptr, 0, parameters, 0);
52*01826a49SYabin Cui               if (ZSTD_isError(err)) {
53*01826a49SYabin Cui                 ZSTD_freeCStream(zcs);
54*01826a49SYabin Cui                 return nullptr;
55*01826a49SYabin Cui               }
56*01826a49SYabin Cui             }
57*01826a49SYabin Cui             return zcs;
58*01826a49SYabin Cui           },
59*01826a49SYabin Cui           [](ZSTD_CStream *zcs) {
60*01826a49SYabin Cui             ZSTD_freeCStream(zcs);
61*01826a49SYabin Cui           }});
62*01826a49SYabin Cui     } else {
63*01826a49SYabin Cui       dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
64*01826a49SYabin Cui           [this]() -> ZSTD_DStream* {
65*01826a49SYabin Cui             this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream");
66*01826a49SYabin Cui             auto zds = ZSTD_createDStream();
67*01826a49SYabin Cui             if (zds) {
68*01826a49SYabin Cui               auto err = ZSTD_initDStream(zds);
69*01826a49SYabin Cui               if (ZSTD_isError(err)) {
70*01826a49SYabin Cui                 ZSTD_freeDStream(zds);
71*01826a49SYabin Cui                 return nullptr;
72*01826a49SYabin Cui               }
73*01826a49SYabin Cui             }
74*01826a49SYabin Cui             return zds;
75*01826a49SYabin Cui           },
76*01826a49SYabin Cui           [](ZSTD_DStream *zds) {
77*01826a49SYabin Cui             ZSTD_freeDStream(zds);
78*01826a49SYabin Cui           }});
79*01826a49SYabin Cui     }
80*01826a49SYabin Cui   }
81*01826a49SYabin Cui 
~SharedState()82*01826a49SYabin Cui   ~SharedState() {
83*01826a49SYabin Cui     // The resource pools have references to this, so destroy them first.
84*01826a49SYabin Cui     cStreamPool.reset();
85*01826a49SYabin Cui     dStreamPool.reset();
86*01826a49SYabin Cui   }
87*01826a49SYabin Cui 
88*01826a49SYabin Cui   Logger log;
89*01826a49SYabin Cui   ErrorHolder errorHolder;
90*01826a49SYabin Cui   std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
91*01826a49SYabin Cui   std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
92*01826a49SYabin Cui };
93*01826a49SYabin Cui 
94*01826a49SYabin Cui /**
95*01826a49SYabin Cui  * Streams input from `fd`, breaks input up into chunks, and compresses each
96*01826a49SYabin Cui  * chunk independently.  Output of each chunk gets streamed to a queue, and
97*01826a49SYabin Cui  * the output queues get put into `chunks` in order.
98*01826a49SYabin Cui  *
99*01826a49SYabin Cui  * @param state        The shared state
100*01826a49SYabin Cui  * @param chunks       Each compression jobs output queue gets `pushed()` here
101*01826a49SYabin Cui  *                      as soon as it is available
102*01826a49SYabin Cui  * @param executor     The thread pool to run compression jobs in
103*01826a49SYabin Cui  * @param fd           The input file descriptor
104*01826a49SYabin Cui  * @param size         The size of the input file if known, 0 otherwise
105*01826a49SYabin Cui  * @param numThreads   The number of threads in the thread pool
106*01826a49SYabin Cui  * @param parameters   The zstd parameters to use for compression
107*01826a49SYabin Cui  * @returns            The number of bytes read from the file
108*01826a49SYabin Cui  */
109*01826a49SYabin Cui std::uint64_t asyncCompressChunks(
110*01826a49SYabin Cui     SharedState& state,
111*01826a49SYabin Cui     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
112*01826a49SYabin Cui     ThreadPool& executor,
113*01826a49SYabin Cui     FILE* fd,
114*01826a49SYabin Cui     std::uintmax_t size,
115*01826a49SYabin Cui     std::size_t numThreads,
116*01826a49SYabin Cui     ZSTD_parameters parameters);
117*01826a49SYabin Cui 
118*01826a49SYabin Cui /**
119*01826a49SYabin Cui  * Streams input from `fd`.  If pzstd headers are available it breaks the input
120*01826a49SYabin Cui  * up into independent frames.  It sends each frame to an independent
121*01826a49SYabin Cui  * decompression job.  Output of each frame gets streamed to a queue, and
122*01826a49SYabin Cui  * the output queues get put into `frames` in order.
123*01826a49SYabin Cui  *
124*01826a49SYabin Cui  * @param state        The shared state
125*01826a49SYabin Cui  * @param frames       Each decompression jobs output queue gets `pushed()` here
126*01826a49SYabin Cui  *                      as soon as it is available
127*01826a49SYabin Cui  * @param executor     The thread pool to run compression jobs in
128*01826a49SYabin Cui  * @param fd           The input file descriptor
129*01826a49SYabin Cui  * @returns            The number of bytes read from the file
130*01826a49SYabin Cui  */
131*01826a49SYabin Cui std::uint64_t asyncDecompressFrames(
132*01826a49SYabin Cui     SharedState& state,
133*01826a49SYabin Cui     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
134*01826a49SYabin Cui     ThreadPool& executor,
135*01826a49SYabin Cui     FILE* fd);
136*01826a49SYabin Cui 
137*01826a49SYabin Cui /**
138*01826a49SYabin Cui  * Streams input in from each queue in `outs` in order, and writes the data to
139*01826a49SYabin Cui  * `outputFd`.
140*01826a49SYabin Cui  *
141*01826a49SYabin Cui  * @param state        The shared state
142*01826a49SYabin Cui  * @param outs         A queue of output queues, one for each
143*01826a49SYabin Cui  *                      (de)compression job.
144*01826a49SYabin Cui  * @param outputFd     The file descriptor to write to
145*01826a49SYabin Cui  * @param decompress   Are we decompressing?
146*01826a49SYabin Cui  * @returns            The number of bytes written
147*01826a49SYabin Cui  */
148*01826a49SYabin Cui std::uint64_t writeFile(
149*01826a49SYabin Cui     SharedState& state,
150*01826a49SYabin Cui     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
151*01826a49SYabin Cui     FILE* outputFd,
152*01826a49SYabin Cui     bool decompress);
153*01826a49SYabin Cui }
154