1*01826a49SYabin Cui /*
2*01826a49SYabin Cui * Copyright (c) Meta Platforms, Inc. and affiliates.
3*01826a49SYabin Cui * All rights reserved.
4*01826a49SYabin Cui *
5*01826a49SYabin Cui * This source code is licensed under both the BSD-style license (found in the
6*01826a49SYabin Cui * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7*01826a49SYabin Cui * in the COPYING file in the root directory of this source tree).
8*01826a49SYabin Cui * You may select, at your option, one of the above-listed licenses.
9*01826a49SYabin Cui */
10*01826a49SYabin Cui
11*01826a49SYabin Cui #include "platform.h"
12*01826a49SYabin Cui #include <stdio.h> /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */
13*01826a49SYabin Cui #include <stdlib.h> /* malloc, free */
14*01826a49SYabin Cui #include <assert.h>
15*01826a49SYabin Cui #include <errno.h> /* errno */
16*01826a49SYabin Cui
17*01826a49SYabin Cui #if defined (_MSC_VER)
18*01826a49SYabin Cui # include <sys/stat.h>
19*01826a49SYabin Cui # include <io.h>
20*01826a49SYabin Cui #endif
21*01826a49SYabin Cui
22*01826a49SYabin Cui #include "fileio_asyncio.h"
23*01826a49SYabin Cui #include "fileio_common.h"
24*01826a49SYabin Cui
25*01826a49SYabin Cui /* **********************************************************************
26*01826a49SYabin Cui * Sparse write
27*01826a49SYabin Cui ************************************************************************/
28*01826a49SYabin Cui
29*01826a49SYabin Cui /** AIO_fwriteSparse() :
30*01826a49SYabin Cui * @return : storedSkips,
31*01826a49SYabin Cui * argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
32*01826a49SYabin Cui static unsigned
AIO_fwriteSparse(FILE * file,const void * buffer,size_t bufferSize,const FIO_prefs_t * const prefs,unsigned storedSkips)33*01826a49SYabin Cui AIO_fwriteSparse(FILE* file,
34*01826a49SYabin Cui const void* buffer, size_t bufferSize,
35*01826a49SYabin Cui const FIO_prefs_t* const prefs,
36*01826a49SYabin Cui unsigned storedSkips)
37*01826a49SYabin Cui {
38*01826a49SYabin Cui const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */
39*01826a49SYabin Cui size_t bufferSizeT = bufferSize / sizeof(size_t);
40*01826a49SYabin Cui const size_t* const bufferTEnd = bufferT + bufferSizeT;
41*01826a49SYabin Cui const size_t* ptrT = bufferT;
42*01826a49SYabin Cui static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */
43*01826a49SYabin Cui
44*01826a49SYabin Cui if (prefs->testMode) return 0; /* do not output anything in test mode */
45*01826a49SYabin Cui
46*01826a49SYabin Cui if (!prefs->sparseFileSupport) { /* normal write */
47*01826a49SYabin Cui size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
48*01826a49SYabin Cui if (sizeCheck != bufferSize)
49*01826a49SYabin Cui EXM_THROW(70, "Write error : cannot write block : %s",
50*01826a49SYabin Cui strerror(errno));
51*01826a49SYabin Cui return 0;
52*01826a49SYabin Cui }
53*01826a49SYabin Cui
54*01826a49SYabin Cui /* avoid int overflow */
55*01826a49SYabin Cui if (storedSkips > 1 GB) {
56*01826a49SYabin Cui if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
57*01826a49SYabin Cui EXM_THROW(91, "1 GB skip error (sparse file support)");
58*01826a49SYabin Cui storedSkips -= 1 GB;
59*01826a49SYabin Cui }
60*01826a49SYabin Cui
61*01826a49SYabin Cui while (ptrT < bufferTEnd) {
62*01826a49SYabin Cui size_t nb0T;
63*01826a49SYabin Cui
64*01826a49SYabin Cui /* adjust last segment if < 32 KB */
65*01826a49SYabin Cui size_t seg0SizeT = segmentSizeT;
66*01826a49SYabin Cui if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
67*01826a49SYabin Cui bufferSizeT -= seg0SizeT;
68*01826a49SYabin Cui
69*01826a49SYabin Cui /* count leading zeroes */
70*01826a49SYabin Cui for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
71*01826a49SYabin Cui storedSkips += (unsigned)(nb0T * sizeof(size_t));
72*01826a49SYabin Cui
73*01826a49SYabin Cui if (nb0T != seg0SizeT) { /* not all 0s */
74*01826a49SYabin Cui size_t const nbNon0ST = seg0SizeT - nb0T;
75*01826a49SYabin Cui /* skip leading zeros */
76*01826a49SYabin Cui if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
77*01826a49SYabin Cui EXM_THROW(92, "Sparse skip error ; try --no-sparse");
78*01826a49SYabin Cui storedSkips = 0;
79*01826a49SYabin Cui /* write the rest */
80*01826a49SYabin Cui if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
81*01826a49SYabin Cui EXM_THROW(93, "Write error : cannot write block : %s",
82*01826a49SYabin Cui strerror(errno));
83*01826a49SYabin Cui }
84*01826a49SYabin Cui ptrT += seg0SizeT;
85*01826a49SYabin Cui }
86*01826a49SYabin Cui
87*01826a49SYabin Cui { static size_t const maskT = sizeof(size_t)-1;
88*01826a49SYabin Cui if (bufferSize & maskT) {
89*01826a49SYabin Cui /* size not multiple of sizeof(size_t) : implies end of block */
90*01826a49SYabin Cui const char* const restStart = (const char*)bufferTEnd;
91*01826a49SYabin Cui const char* restPtr = restStart;
92*01826a49SYabin Cui const char* const restEnd = (const char*)buffer + bufferSize;
93*01826a49SYabin Cui assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
94*01826a49SYabin Cui for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
95*01826a49SYabin Cui storedSkips += (unsigned) (restPtr - restStart);
96*01826a49SYabin Cui if (restPtr != restEnd) {
97*01826a49SYabin Cui /* not all remaining bytes are 0 */
98*01826a49SYabin Cui size_t const restSize = (size_t)(restEnd - restPtr);
99*01826a49SYabin Cui if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
100*01826a49SYabin Cui EXM_THROW(92, "Sparse skip error ; try --no-sparse");
101*01826a49SYabin Cui if (fwrite(restPtr, 1, restSize, file) != restSize)
102*01826a49SYabin Cui EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
103*01826a49SYabin Cui strerror(errno));
104*01826a49SYabin Cui storedSkips = 0;
105*01826a49SYabin Cui } } }
106*01826a49SYabin Cui
107*01826a49SYabin Cui return storedSkips;
108*01826a49SYabin Cui }
109*01826a49SYabin Cui
110*01826a49SYabin Cui static void
AIO_fwriteSparseEnd(const FIO_prefs_t * const prefs,FILE * file,unsigned storedSkips)111*01826a49SYabin Cui AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
112*01826a49SYabin Cui {
113*01826a49SYabin Cui if (prefs->testMode) assert(storedSkips == 0);
114*01826a49SYabin Cui if (storedSkips>0) {
115*01826a49SYabin Cui assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */
116*01826a49SYabin Cui (void)prefs; /* assert can be disabled, in which case prefs becomes unused */
117*01826a49SYabin Cui if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
118*01826a49SYabin Cui EXM_THROW(69, "Final skip error (sparse file support)");
119*01826a49SYabin Cui /* last zero must be explicitly written,
120*01826a49SYabin Cui * so that skipped ones get implicitly translated as zero by FS */
121*01826a49SYabin Cui { const char lastZeroByte[1] = { 0 };
122*01826a49SYabin Cui if (fwrite(lastZeroByte, 1, 1, file) != 1)
123*01826a49SYabin Cui EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
124*01826a49SYabin Cui } }
125*01826a49SYabin Cui }
126*01826a49SYabin Cui
127*01826a49SYabin Cui
128*01826a49SYabin Cui /* **********************************************************************
129*01826a49SYabin Cui * AsyncIO functionality
130*01826a49SYabin Cui ************************************************************************/
131*01826a49SYabin Cui
132*01826a49SYabin Cui /* AIO_supported:
133*01826a49SYabin Cui * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
AIO_supported(void)134*01826a49SYabin Cui int AIO_supported(void) {
135*01826a49SYabin Cui #ifdef ZSTD_MULTITHREAD
136*01826a49SYabin Cui return 1;
137*01826a49SYabin Cui #else
138*01826a49SYabin Cui return 0;
139*01826a49SYabin Cui #endif
140*01826a49SYabin Cui }
141*01826a49SYabin Cui
142*01826a49SYabin Cui /* ***********************************
143*01826a49SYabin Cui * Generic IoPool implementation
144*01826a49SYabin Cui *************************************/
145*01826a49SYabin Cui
AIO_IOPool_createIoJob(IOPoolCtx_t * ctx,size_t bufferSize)146*01826a49SYabin Cui static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
147*01826a49SYabin Cui IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));
148*01826a49SYabin Cui void* const buffer = malloc(bufferSize);
149*01826a49SYabin Cui if(!job || !buffer)
150*01826a49SYabin Cui EXM_THROW(101, "Allocation error : not enough memory");
151*01826a49SYabin Cui job->buffer = buffer;
152*01826a49SYabin Cui job->bufferSize = bufferSize;
153*01826a49SYabin Cui job->usedBufferSize = 0;
154*01826a49SYabin Cui job->file = NULL;
155*01826a49SYabin Cui job->ctx = ctx;
156*01826a49SYabin Cui job->offset = 0;
157*01826a49SYabin Cui return job;
158*01826a49SYabin Cui }
159*01826a49SYabin Cui
160*01826a49SYabin Cui
161*01826a49SYabin Cui /* AIO_IOPool_createThreadPool:
162*01826a49SYabin Cui * Creates a thread pool and a mutex for threaded IO pool.
163*01826a49SYabin Cui * Displays warning if asyncio is requested but MT isn't available. */
AIO_IOPool_createThreadPool(IOPoolCtx_t * ctx,const FIO_prefs_t * prefs)164*01826a49SYabin Cui static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
165*01826a49SYabin Cui ctx->threadPool = NULL;
166*01826a49SYabin Cui ctx->threadPoolActive = 0;
167*01826a49SYabin Cui if(prefs->asyncIO) {
168*01826a49SYabin Cui if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
169*01826a49SYabin Cui EXM_THROW(102,"Failed creating ioJobsMutex mutex");
170*01826a49SYabin Cui /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
171*01826a49SYabin Cui * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
172*01826a49SYabin Cui assert(MAX_IO_JOBS >= 2);
173*01826a49SYabin Cui ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
174*01826a49SYabin Cui ctx->threadPoolActive = 1;
175*01826a49SYabin Cui if (!ctx->threadPool)
176*01826a49SYabin Cui EXM_THROW(104, "Failed creating I/O thread pool");
177*01826a49SYabin Cui }
178*01826a49SYabin Cui }
179*01826a49SYabin Cui
180*01826a49SYabin Cui /* AIO_IOPool_init:
181*01826a49SYabin Cui * Allocates and sets and a new I/O thread pool including its included availableJobs. */
AIO_IOPool_init(IOPoolCtx_t * ctx,const FIO_prefs_t * prefs,POOL_function poolFunction,size_t bufferSize)182*01826a49SYabin Cui static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
183*01826a49SYabin Cui int i;
184*01826a49SYabin Cui AIO_IOPool_createThreadPool(ctx, prefs);
185*01826a49SYabin Cui ctx->prefs = prefs;
186*01826a49SYabin Cui ctx->poolFunction = poolFunction;
187*01826a49SYabin Cui ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
188*01826a49SYabin Cui ctx->availableJobsCount = ctx->totalIoJobs;
189*01826a49SYabin Cui for(i=0; i < ctx->availableJobsCount; i++) {
190*01826a49SYabin Cui ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
191*01826a49SYabin Cui }
192*01826a49SYabin Cui ctx->jobBufferSize = bufferSize;
193*01826a49SYabin Cui ctx->file = NULL;
194*01826a49SYabin Cui }
195*01826a49SYabin Cui
196*01826a49SYabin Cui
197*01826a49SYabin Cui /* AIO_IOPool_threadPoolActive:
198*01826a49SYabin Cui * Check if current operation uses thread pool.
199*01826a49SYabin Cui * Note that in some cases we have a thread pool initialized but choose not to use it. */
AIO_IOPool_threadPoolActive(IOPoolCtx_t * ctx)200*01826a49SYabin Cui static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
201*01826a49SYabin Cui return ctx->threadPool && ctx->threadPoolActive;
202*01826a49SYabin Cui }
203*01826a49SYabin Cui
204*01826a49SYabin Cui
205*01826a49SYabin Cui /* AIO_IOPool_lockJobsMutex:
206*01826a49SYabin Cui * Locks the IO jobs mutex if threading is active */
AIO_IOPool_lockJobsMutex(IOPoolCtx_t * ctx)207*01826a49SYabin Cui static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
208*01826a49SYabin Cui if(AIO_IOPool_threadPoolActive(ctx))
209*01826a49SYabin Cui ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
210*01826a49SYabin Cui }
211*01826a49SYabin Cui
212*01826a49SYabin Cui /* AIO_IOPool_unlockJobsMutex:
213*01826a49SYabin Cui * Unlocks the IO jobs mutex if threading is active */
AIO_IOPool_unlockJobsMutex(IOPoolCtx_t * ctx)214*01826a49SYabin Cui static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
215*01826a49SYabin Cui if(AIO_IOPool_threadPoolActive(ctx))
216*01826a49SYabin Cui ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
217*01826a49SYabin Cui }
218*01826a49SYabin Cui
219*01826a49SYabin Cui /* AIO_IOPool_releaseIoJob:
220*01826a49SYabin Cui * Releases an acquired job back to the pool. Doesn't execute the job. */
AIO_IOPool_releaseIoJob(IOJob_t * job)221*01826a49SYabin Cui static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
222*01826a49SYabin Cui IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
223*01826a49SYabin Cui AIO_IOPool_lockJobsMutex(ctx);
224*01826a49SYabin Cui assert(ctx->availableJobsCount < ctx->totalIoJobs);
225*01826a49SYabin Cui ctx->availableJobs[ctx->availableJobsCount++] = job;
226*01826a49SYabin Cui AIO_IOPool_unlockJobsMutex(ctx);
227*01826a49SYabin Cui }
228*01826a49SYabin Cui
229*01826a49SYabin Cui /* AIO_IOPool_join:
230*01826a49SYabin Cui * Waits for all tasks in the pool to finish executing. */
AIO_IOPool_join(IOPoolCtx_t * ctx)231*01826a49SYabin Cui static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
232*01826a49SYabin Cui if(AIO_IOPool_threadPoolActive(ctx))
233*01826a49SYabin Cui POOL_joinJobs(ctx->threadPool);
234*01826a49SYabin Cui }
235*01826a49SYabin Cui
236*01826a49SYabin Cui /* AIO_IOPool_setThreaded:
237*01826a49SYabin Cui * Allows (de)activating threaded mode, to be used when the expected overhead
238*01826a49SYabin Cui * of threading costs more than the expected gains. */
AIO_IOPool_setThreaded(IOPoolCtx_t * ctx,int threaded)239*01826a49SYabin Cui static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
240*01826a49SYabin Cui assert(threaded == 0 || threaded == 1);
241*01826a49SYabin Cui assert(ctx != NULL);
242*01826a49SYabin Cui if(ctx->threadPoolActive != threaded) {
243*01826a49SYabin Cui AIO_IOPool_join(ctx);
244*01826a49SYabin Cui ctx->threadPoolActive = threaded;
245*01826a49SYabin Cui }
246*01826a49SYabin Cui }
247*01826a49SYabin Cui
248*01826a49SYabin Cui /* AIO_IOPool_free:
249*01826a49SYabin Cui * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
AIO_IOPool_destroy(IOPoolCtx_t * ctx)250*01826a49SYabin Cui static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
251*01826a49SYabin Cui int i;
252*01826a49SYabin Cui if(ctx->threadPool) {
253*01826a49SYabin Cui /* Make sure we finish all tasks and then free the resources */
254*01826a49SYabin Cui AIO_IOPool_join(ctx);
255*01826a49SYabin Cui /* Make sure we are not leaking availableJobs */
256*01826a49SYabin Cui assert(ctx->availableJobsCount == ctx->totalIoJobs);
257*01826a49SYabin Cui POOL_free(ctx->threadPool);
258*01826a49SYabin Cui ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);
259*01826a49SYabin Cui }
260*01826a49SYabin Cui assert(ctx->file == NULL);
261*01826a49SYabin Cui for(i=0; i<ctx->availableJobsCount; i++) {
262*01826a49SYabin Cui IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];
263*01826a49SYabin Cui free(job->buffer);
264*01826a49SYabin Cui free(job);
265*01826a49SYabin Cui }
266*01826a49SYabin Cui }
267*01826a49SYabin Cui
268*01826a49SYabin Cui /* AIO_IOPool_acquireJob:
269*01826a49SYabin Cui * Returns an available io job to be used for a future io. */
AIO_IOPool_acquireJob(IOPoolCtx_t * ctx)270*01826a49SYabin Cui static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
271*01826a49SYabin Cui IOJob_t *job;
272*01826a49SYabin Cui assert(ctx->file != NULL || ctx->prefs->testMode);
273*01826a49SYabin Cui AIO_IOPool_lockJobsMutex(ctx);
274*01826a49SYabin Cui assert(ctx->availableJobsCount > 0);
275*01826a49SYabin Cui job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
276*01826a49SYabin Cui AIO_IOPool_unlockJobsMutex(ctx);
277*01826a49SYabin Cui job->usedBufferSize = 0;
278*01826a49SYabin Cui job->file = ctx->file;
279*01826a49SYabin Cui job->offset = 0;
280*01826a49SYabin Cui return job;
281*01826a49SYabin Cui }
282*01826a49SYabin Cui
283*01826a49SYabin Cui
284*01826a49SYabin Cui /* AIO_IOPool_setFile:
285*01826a49SYabin Cui * Sets the destination file for future files in the pool.
286*01826a49SYabin Cui * Requires completion of all queued jobs and release of all otherwise acquired jobs. */
AIO_IOPool_setFile(IOPoolCtx_t * ctx,FILE * file)287*01826a49SYabin Cui static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
288*01826a49SYabin Cui assert(ctx!=NULL);
289*01826a49SYabin Cui AIO_IOPool_join(ctx);
290*01826a49SYabin Cui assert(ctx->availableJobsCount == ctx->totalIoJobs);
291*01826a49SYabin Cui ctx->file = file;
292*01826a49SYabin Cui }
293*01826a49SYabin Cui
AIO_IOPool_getFile(const IOPoolCtx_t * ctx)294*01826a49SYabin Cui static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
295*01826a49SYabin Cui return ctx->file;
296*01826a49SYabin Cui }
297*01826a49SYabin Cui
298*01826a49SYabin Cui /* AIO_IOPool_enqueueJob:
299*01826a49SYabin Cui * Enqueues an io job for execution.
300*01826a49SYabin Cui * The queued job shouldn't be used directly after queueing it. */
AIO_IOPool_enqueueJob(IOJob_t * job)301*01826a49SYabin Cui static void AIO_IOPool_enqueueJob(IOJob_t* job) {
302*01826a49SYabin Cui IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
303*01826a49SYabin Cui if(AIO_IOPool_threadPoolActive(ctx))
304*01826a49SYabin Cui POOL_add(ctx->threadPool, ctx->poolFunction, job);
305*01826a49SYabin Cui else
306*01826a49SYabin Cui ctx->poolFunction(job);
307*01826a49SYabin Cui }
308*01826a49SYabin Cui
309*01826a49SYabin Cui /* ***********************************
310*01826a49SYabin Cui * WritePool implementation
311*01826a49SYabin Cui *************************************/
312*01826a49SYabin Cui
313*01826a49SYabin Cui /* AIO_WritePool_acquireJob:
314*01826a49SYabin Cui * Returns an available write job to be used for a future write. */
AIO_WritePool_acquireJob(WritePoolCtx_t * ctx)315*01826a49SYabin Cui IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
316*01826a49SYabin Cui return AIO_IOPool_acquireJob(&ctx->base);
317*01826a49SYabin Cui }
318*01826a49SYabin Cui
319*01826a49SYabin Cui /* AIO_WritePool_enqueueAndReacquireWriteJob:
320*01826a49SYabin Cui * Queues a write job for execution and acquires a new one.
321*01826a49SYabin Cui * After execution `job`'s pointed value would change to the newly acquired job.
322*01826a49SYabin Cui * Make sure to set `usedBufferSize` to the wanted length before call.
323*01826a49SYabin Cui * The queued job shouldn't be used directly after queueing it. */
AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t ** job)324*01826a49SYabin Cui void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
325*01826a49SYabin Cui AIO_IOPool_enqueueJob(*job);
326*01826a49SYabin Cui *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);
327*01826a49SYabin Cui }
328*01826a49SYabin Cui
329*01826a49SYabin Cui /* AIO_WritePool_sparseWriteEnd:
330*01826a49SYabin Cui * Ends sparse writes to the current file.
331*01826a49SYabin Cui * Blocks on completion of all current write jobs before executing. */
AIO_WritePool_sparseWriteEnd(WritePoolCtx_t * ctx)332*01826a49SYabin Cui void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
333*01826a49SYabin Cui assert(ctx != NULL);
334*01826a49SYabin Cui AIO_IOPool_join(&ctx->base);
335*01826a49SYabin Cui AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
336*01826a49SYabin Cui ctx->storedSkips = 0;
337*01826a49SYabin Cui }
338*01826a49SYabin Cui
339*01826a49SYabin Cui /* AIO_WritePool_setFile:
340*01826a49SYabin Cui * Sets the destination file for future writes in the pool.
341*01826a49SYabin Cui * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
342*01826a49SYabin Cui * Also requires ending of sparse write if a previous file was used in sparse mode. */
AIO_WritePool_setFile(WritePoolCtx_t * ctx,FILE * file)343*01826a49SYabin Cui void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
344*01826a49SYabin Cui AIO_IOPool_setFile(&ctx->base, file);
345*01826a49SYabin Cui assert(ctx->storedSkips == 0);
346*01826a49SYabin Cui }
347*01826a49SYabin Cui
348*01826a49SYabin Cui /* AIO_WritePool_getFile:
349*01826a49SYabin Cui * Returns the file the writePool is currently set to write to. */
AIO_WritePool_getFile(const WritePoolCtx_t * ctx)350*01826a49SYabin Cui FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
351*01826a49SYabin Cui return AIO_IOPool_getFile(&ctx->base);
352*01826a49SYabin Cui }
353*01826a49SYabin Cui
354*01826a49SYabin Cui /* AIO_WritePool_releaseIoJob:
355*01826a49SYabin Cui * Releases an acquired job back to the pool. Doesn't execute the job. */
AIO_WritePool_releaseIoJob(IOJob_t * job)356*01826a49SYabin Cui void AIO_WritePool_releaseIoJob(IOJob_t* job) {
357*01826a49SYabin Cui AIO_IOPool_releaseIoJob(job);
358*01826a49SYabin Cui }
359*01826a49SYabin Cui
360*01826a49SYabin Cui /* AIO_WritePool_closeFile:
361*01826a49SYabin Cui * Ends sparse write and closes the writePool's current file and sets the file to NULL.
362*01826a49SYabin Cui * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
AIO_WritePool_closeFile(WritePoolCtx_t * ctx)363*01826a49SYabin Cui int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
364*01826a49SYabin Cui FILE* const dstFile = ctx->base.file;
365*01826a49SYabin Cui assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
366*01826a49SYabin Cui AIO_WritePool_sparseWriteEnd(ctx);
367*01826a49SYabin Cui AIO_IOPool_setFile(&ctx->base, NULL);
368*01826a49SYabin Cui return fclose(dstFile);
369*01826a49SYabin Cui }
370*01826a49SYabin Cui
371*01826a49SYabin Cui /* AIO_WritePool_executeWriteJob:
372*01826a49SYabin Cui * Executes a write job synchronously. Can be used as a function for a thread pool. */
AIO_WritePool_executeWriteJob(void * opaque)373*01826a49SYabin Cui static void AIO_WritePool_executeWriteJob(void* opaque){
374*01826a49SYabin Cui IOJob_t* const job = (IOJob_t*) opaque;
375*01826a49SYabin Cui WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
376*01826a49SYabin Cui ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
377*01826a49SYabin Cui AIO_IOPool_releaseIoJob(job);
378*01826a49SYabin Cui }
379*01826a49SYabin Cui
380*01826a49SYabin Cui /* AIO_WritePool_create:
381*01826a49SYabin Cui * Allocates and sets and a new write pool including its included jobs. */
AIO_WritePool_create(const FIO_prefs_t * prefs,size_t bufferSize)382*01826a49SYabin Cui WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
383*01826a49SYabin Cui WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
384*01826a49SYabin Cui if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
385*01826a49SYabin Cui AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
386*01826a49SYabin Cui ctx->storedSkips = 0;
387*01826a49SYabin Cui return ctx;
388*01826a49SYabin Cui }
389*01826a49SYabin Cui
390*01826a49SYabin Cui /* AIO_WritePool_free:
391*01826a49SYabin Cui * Frees and releases a writePool and its resources. Closes destination file if needs to. */
AIO_WritePool_free(WritePoolCtx_t * ctx)392*01826a49SYabin Cui void AIO_WritePool_free(WritePoolCtx_t* ctx) {
393*01826a49SYabin Cui /* Make sure we finish all tasks and then free the resources */
394*01826a49SYabin Cui if(AIO_WritePool_getFile(ctx))
395*01826a49SYabin Cui AIO_WritePool_closeFile(ctx);
396*01826a49SYabin Cui AIO_IOPool_destroy(&ctx->base);
397*01826a49SYabin Cui assert(ctx->storedSkips==0);
398*01826a49SYabin Cui free(ctx);
399*01826a49SYabin Cui }
400*01826a49SYabin Cui
401*01826a49SYabin Cui /* AIO_WritePool_setAsync:
402*01826a49SYabin Cui * Allows (de)activating async mode, to be used when the expected overhead
403*01826a49SYabin Cui * of asyncio costs more than the expected gains. */
AIO_WritePool_setAsync(WritePoolCtx_t * ctx,int async)404*01826a49SYabin Cui void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
405*01826a49SYabin Cui AIO_IOPool_setThreaded(&ctx->base, async);
406*01826a49SYabin Cui }
407*01826a49SYabin Cui
408*01826a49SYabin Cui
409*01826a49SYabin Cui /* ***********************************
410*01826a49SYabin Cui * ReadPool implementation
411*01826a49SYabin Cui *************************************/
AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t * ctx)412*01826a49SYabin Cui static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
413*01826a49SYabin Cui int i;
414*01826a49SYabin Cui for(i=0; i<ctx->completedJobsCount; i++) {
415*01826a49SYabin Cui IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
416*01826a49SYabin Cui AIO_IOPool_releaseIoJob(job);
417*01826a49SYabin Cui }
418*01826a49SYabin Cui ctx->completedJobsCount = 0;
419*01826a49SYabin Cui }
420*01826a49SYabin Cui
AIO_ReadPool_addJobToCompleted(IOJob_t * job)421*01826a49SYabin Cui static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
422*01826a49SYabin Cui ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
423*01826a49SYabin Cui AIO_IOPool_lockJobsMutex(&ctx->base);
424*01826a49SYabin Cui assert(ctx->completedJobsCount < MAX_IO_JOBS);
425*01826a49SYabin Cui ctx->completedJobs[ctx->completedJobsCount++] = job;
426*01826a49SYabin Cui if(AIO_IOPool_threadPoolActive(&ctx->base)) {
427*01826a49SYabin Cui ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
428*01826a49SYabin Cui }
429*01826a49SYabin Cui AIO_IOPool_unlockJobsMutex(&ctx->base);
430*01826a49SYabin Cui }
431*01826a49SYabin Cui
432*01826a49SYabin Cui /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
433*01826a49SYabin Cui * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
434*01826a49SYabin Cui * if job wasn't found returns NULL.
435*01826a49SYabin Cui * IMPORTANT: assumes ioJobsMutex is locked. */
AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t * ctx)436*01826a49SYabin Cui static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
437*01826a49SYabin Cui IOJob_t *job = NULL;
438*01826a49SYabin Cui int i;
439*01826a49SYabin Cui /* This implementation goes through all completed jobs and looks for the one matching the next offset.
440*01826a49SYabin Cui * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
441*01826a49SYabin Cui * reads to be completed in order) this implementation was chosen as it better fits other asyncio
442*01826a49SYabin Cui * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
443*01826a49SYabin Cui for (i=0; i<ctx->completedJobsCount; i++) {
444*01826a49SYabin Cui job = (IOJob_t *) ctx->completedJobs[i];
445*01826a49SYabin Cui if (job->offset == ctx->waitingOnOffset) {
446*01826a49SYabin Cui ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
447*01826a49SYabin Cui return job;
448*01826a49SYabin Cui }
449*01826a49SYabin Cui }
450*01826a49SYabin Cui return NULL;
451*01826a49SYabin Cui }
452*01826a49SYabin Cui
453*01826a49SYabin Cui /* AIO_ReadPool_numReadsInFlight:
454*01826a49SYabin Cui * Returns the number of IO read jobs currently in flight. */
AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t * ctx)455*01826a49SYabin Cui static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
456*01826a49SYabin Cui const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
457*01826a49SYabin Cui return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld));
458*01826a49SYabin Cui }
459*01826a49SYabin Cui
460*01826a49SYabin Cui /* AIO_ReadPool_getNextCompletedJob:
461*01826a49SYabin Cui * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
462*01826a49SYabin Cui * Would block. */
AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t * ctx)463*01826a49SYabin Cui static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
464*01826a49SYabin Cui IOJob_t *job = NULL;
465*01826a49SYabin Cui AIO_IOPool_lockJobsMutex(&ctx->base);
466*01826a49SYabin Cui
467*01826a49SYabin Cui job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
468*01826a49SYabin Cui
469*01826a49SYabin Cui /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
470*01826a49SYabin Cui while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
471*01826a49SYabin Cui assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
472*01826a49SYabin Cui ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
473*01826a49SYabin Cui job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
474*01826a49SYabin Cui }
475*01826a49SYabin Cui
476*01826a49SYabin Cui if(job) {
477*01826a49SYabin Cui assert(job->offset == ctx->waitingOnOffset);
478*01826a49SYabin Cui ctx->waitingOnOffset += job->usedBufferSize;
479*01826a49SYabin Cui }
480*01826a49SYabin Cui
481*01826a49SYabin Cui AIO_IOPool_unlockJobsMutex(&ctx->base);
482*01826a49SYabin Cui return job;
483*01826a49SYabin Cui }
484*01826a49SYabin Cui
485*01826a49SYabin Cui
486*01826a49SYabin Cui /* AIO_ReadPool_executeReadJob:
487*01826a49SYabin Cui * Executes a read job synchronously. Can be used as a function for a thread pool. */
AIO_ReadPool_executeReadJob(void * opaque)488*01826a49SYabin Cui static void AIO_ReadPool_executeReadJob(void* opaque){
489*01826a49SYabin Cui IOJob_t* const job = (IOJob_t*) opaque;
490*01826a49SYabin Cui ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
491*01826a49SYabin Cui if(ctx->reachedEof) {
492*01826a49SYabin Cui job->usedBufferSize = 0;
493*01826a49SYabin Cui AIO_ReadPool_addJobToCompleted(job);
494*01826a49SYabin Cui return;
495*01826a49SYabin Cui }
496*01826a49SYabin Cui job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
497*01826a49SYabin Cui if(job->usedBufferSize < job->bufferSize) {
498*01826a49SYabin Cui if(ferror(job->file)) {
499*01826a49SYabin Cui EXM_THROW(37, "Read error");
500*01826a49SYabin Cui } else if(feof(job->file)) {
501*01826a49SYabin Cui ctx->reachedEof = 1;
502*01826a49SYabin Cui } else {
503*01826a49SYabin Cui EXM_THROW(37, "Unexpected short read");
504*01826a49SYabin Cui }
505*01826a49SYabin Cui }
506*01826a49SYabin Cui AIO_ReadPool_addJobToCompleted(job);
507*01826a49SYabin Cui }
508*01826a49SYabin Cui
AIO_ReadPool_enqueueRead(ReadPoolCtx_t * ctx)509*01826a49SYabin Cui static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
510*01826a49SYabin Cui IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
511*01826a49SYabin Cui job->offset = ctx->nextReadOffset;
512*01826a49SYabin Cui ctx->nextReadOffset += job->bufferSize;
513*01826a49SYabin Cui AIO_IOPool_enqueueJob(job);
514*01826a49SYabin Cui }
515*01826a49SYabin Cui
AIO_ReadPool_startReading(ReadPoolCtx_t * ctx)516*01826a49SYabin Cui static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
517*01826a49SYabin Cui while(ctx->base.availableJobsCount) {
518*01826a49SYabin Cui AIO_ReadPool_enqueueRead(ctx);
519*01826a49SYabin Cui }
520*01826a49SYabin Cui }
521*01826a49SYabin Cui
522*01826a49SYabin Cui /* AIO_ReadPool_setFile:
523*01826a49SYabin Cui * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
524*01826a49SYabin Cui * Waits for all current enqueued tasks to complete if a previous file was set. */
AIO_ReadPool_setFile(ReadPoolCtx_t * ctx,FILE * file)525*01826a49SYabin Cui void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
526*01826a49SYabin Cui assert(ctx!=NULL);
527*01826a49SYabin Cui AIO_IOPool_join(&ctx->base);
528*01826a49SYabin Cui AIO_ReadPool_releaseAllCompletedJobs(ctx);
529*01826a49SYabin Cui if (ctx->currentJobHeld) {
530*01826a49SYabin Cui AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
531*01826a49SYabin Cui ctx->currentJobHeld = NULL;
532*01826a49SYabin Cui }
533*01826a49SYabin Cui AIO_IOPool_setFile(&ctx->base, file);
534*01826a49SYabin Cui ctx->nextReadOffset = 0;
535*01826a49SYabin Cui ctx->waitingOnOffset = 0;
536*01826a49SYabin Cui ctx->srcBuffer = ctx->coalesceBuffer;
537*01826a49SYabin Cui ctx->srcBufferLoaded = 0;
538*01826a49SYabin Cui ctx->reachedEof = 0;
539*01826a49SYabin Cui if(file != NULL)
540*01826a49SYabin Cui AIO_ReadPool_startReading(ctx);
541*01826a49SYabin Cui }
542*01826a49SYabin Cui
543*01826a49SYabin Cui /* AIO_ReadPool_create:
544*01826a49SYabin Cui * Allocates and sets and a new readPool including its included jobs.
545*01826a49SYabin Cui * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
546*01826a49SYabin Cui * as our basic read size. */
AIO_ReadPool_create(const FIO_prefs_t * prefs,size_t bufferSize)547*01826a49SYabin Cui ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
548*01826a49SYabin Cui ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
549*01826a49SYabin Cui if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
550*01826a49SYabin Cui AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
551*01826a49SYabin Cui
552*01826a49SYabin Cui ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
553*01826a49SYabin Cui if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory");
554*01826a49SYabin Cui ctx->srcBuffer = ctx->coalesceBuffer;
555*01826a49SYabin Cui ctx->srcBufferLoaded = 0;
556*01826a49SYabin Cui ctx->completedJobsCount = 0;
557*01826a49SYabin Cui ctx->currentJobHeld = NULL;
558*01826a49SYabin Cui
559*01826a49SYabin Cui if(ctx->base.threadPool)
560*01826a49SYabin Cui if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
561*01826a49SYabin Cui EXM_THROW(103,"Failed creating jobCompletedCond cond");
562*01826a49SYabin Cui
563*01826a49SYabin Cui return ctx;
564*01826a49SYabin Cui }
565*01826a49SYabin Cui
566*01826a49SYabin Cui /* AIO_ReadPool_free:
567*01826a49SYabin Cui * Frees and releases a readPool and its resources. Closes source file. */
AIO_ReadPool_free(ReadPoolCtx_t * ctx)568*01826a49SYabin Cui void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
569*01826a49SYabin Cui if(AIO_ReadPool_getFile(ctx))
570*01826a49SYabin Cui AIO_ReadPool_closeFile(ctx);
571*01826a49SYabin Cui if(ctx->base.threadPool)
572*01826a49SYabin Cui ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
573*01826a49SYabin Cui AIO_IOPool_destroy(&ctx->base);
574*01826a49SYabin Cui free(ctx->coalesceBuffer);
575*01826a49SYabin Cui free(ctx);
576*01826a49SYabin Cui }
577*01826a49SYabin Cui
578*01826a49SYabin Cui /* AIO_ReadPool_consumeBytes:
579*01826a49SYabin Cui * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
AIO_ReadPool_consumeBytes(ReadPoolCtx_t * ctx,size_t n)580*01826a49SYabin Cui void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
581*01826a49SYabin Cui assert(n <= ctx->srcBufferLoaded);
582*01826a49SYabin Cui ctx->srcBufferLoaded -= n;
583*01826a49SYabin Cui ctx->srcBuffer += n;
584*01826a49SYabin Cui }
585*01826a49SYabin Cui
586*01826a49SYabin Cui /* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
587*01826a49SYabin Cui * Release the current held job and get the next one, returns NULL if no next job available. */
AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t * ctx)588*01826a49SYabin Cui static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
589*01826a49SYabin Cui if (ctx->currentJobHeld) {
590*01826a49SYabin Cui AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
591*01826a49SYabin Cui ctx->currentJobHeld = NULL;
592*01826a49SYabin Cui AIO_ReadPool_enqueueRead(ctx);
593*01826a49SYabin Cui }
594*01826a49SYabin Cui ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
595*01826a49SYabin Cui return (IOJob_t*) ctx->currentJobHeld;
596*01826a49SYabin Cui }
597*01826a49SYabin Cui
598*01826a49SYabin Cui /* AIO_ReadPool_fillBuffer:
599*01826a49SYabin Cui * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
600*01826a49SYabin Cui * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
601*01826a49SYabin Cui * Return value is the number of bytes added to the buffer.
602*01826a49SYabin Cui * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
AIO_ReadPool_fillBuffer(ReadPoolCtx_t * ctx,size_t n)603*01826a49SYabin Cui size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
604*01826a49SYabin Cui IOJob_t *job;
605*01826a49SYabin Cui int useCoalesce = 0;
606*01826a49SYabin Cui if(n > ctx->base.jobBufferSize)
607*01826a49SYabin Cui n = ctx->base.jobBufferSize;
608*01826a49SYabin Cui
609*01826a49SYabin Cui /* We are good, don't read anything */
610*01826a49SYabin Cui if (ctx->srcBufferLoaded >= n)
611*01826a49SYabin Cui return 0;
612*01826a49SYabin Cui
613*01826a49SYabin Cui /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
614*01826a49SYabin Cui * and coalesce the remaining bytes with the next job's buffer */
615*01826a49SYabin Cui if (ctx->srcBufferLoaded > 0) {
616*01826a49SYabin Cui useCoalesce = 1;
617*01826a49SYabin Cui memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
618*01826a49SYabin Cui ctx->srcBuffer = ctx->coalesceBuffer;
619*01826a49SYabin Cui }
620*01826a49SYabin Cui
621*01826a49SYabin Cui /* Read the next chunk */
622*01826a49SYabin Cui job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
623*01826a49SYabin Cui if(!job)
624*01826a49SYabin Cui return 0;
625*01826a49SYabin Cui if(useCoalesce) {
626*01826a49SYabin Cui assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
627*01826a49SYabin Cui memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
628*01826a49SYabin Cui ctx->srcBufferLoaded += job->usedBufferSize;
629*01826a49SYabin Cui }
630*01826a49SYabin Cui else {
631*01826a49SYabin Cui ctx->srcBuffer = (U8 *) job->buffer;
632*01826a49SYabin Cui ctx->srcBufferLoaded = job->usedBufferSize;
633*01826a49SYabin Cui }
634*01826a49SYabin Cui return job->usedBufferSize;
635*01826a49SYabin Cui }
636*01826a49SYabin Cui
637*01826a49SYabin Cui /* AIO_ReadPool_consumeAndRefill:
638*01826a49SYabin Cui * Consumes the current buffer and refills it with bufferSize bytes. */
AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t * ctx)639*01826a49SYabin Cui size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
640*01826a49SYabin Cui AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
641*01826a49SYabin Cui return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
642*01826a49SYabin Cui }
643*01826a49SYabin Cui
644*01826a49SYabin Cui /* AIO_ReadPool_getFile:
645*01826a49SYabin Cui * Returns the current file set for the read pool. */
AIO_ReadPool_getFile(const ReadPoolCtx_t * ctx)646*01826a49SYabin Cui FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
647*01826a49SYabin Cui return AIO_IOPool_getFile(&ctx->base);
648*01826a49SYabin Cui }
649*01826a49SYabin Cui
650*01826a49SYabin Cui /* AIO_ReadPool_closeFile:
651*01826a49SYabin Cui * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
AIO_ReadPool_closeFile(ReadPoolCtx_t * ctx)652*01826a49SYabin Cui int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
653*01826a49SYabin Cui FILE* const file = AIO_ReadPool_getFile(ctx);
654*01826a49SYabin Cui AIO_ReadPool_setFile(ctx, NULL);
655*01826a49SYabin Cui return fclose(file);
656*01826a49SYabin Cui }
657*01826a49SYabin Cui
658*01826a49SYabin Cui /* AIO_ReadPool_setAsync:
659*01826a49SYabin Cui * Allows (de)activating async mode, to be used when the expected overhead
660*01826a49SYabin Cui * of asyncio costs more than the expected gains. */
AIO_ReadPool_setAsync(ReadPoolCtx_t * ctx,int async)661*01826a49SYabin Cui void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
662*01826a49SYabin Cui AIO_IOPool_setThreaded(&ctx->base, async);
663*01826a49SYabin Cui }
664