xref: /aosp_15_r20/external/lz4/programs/threadpool.c (revision 27162e4e17433d5aa7cb38e7b6a433a09405fc7f)
1*27162e4eSAndroid Build Coastguard Worker /*
2*27162e4eSAndroid Build Coastguard Worker   threadpool.h - part of lz4 project
3*27162e4eSAndroid Build Coastguard Worker   Copyright (C) Yann Collet 2023
4*27162e4eSAndroid Build Coastguard Worker   GPL v2 License
5*27162e4eSAndroid Build Coastguard Worker 
6*27162e4eSAndroid Build Coastguard Worker   This program is free software; you can redistribute it and/or modify
7*27162e4eSAndroid Build Coastguard Worker   it under the terms of the GNU General Public License as published by
8*27162e4eSAndroid Build Coastguard Worker   the Free Software Foundation; either version 2 of the License, or
9*27162e4eSAndroid Build Coastguard Worker   (at your option) any later version.
10*27162e4eSAndroid Build Coastguard Worker 
11*27162e4eSAndroid Build Coastguard Worker   This program is distributed in the hope that it will be useful,
12*27162e4eSAndroid Build Coastguard Worker   but WITHOUT ANY WARRANTY; without even the implied warranty of
13*27162e4eSAndroid Build Coastguard Worker   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14*27162e4eSAndroid Build Coastguard Worker   GNU General Public License for more details.
15*27162e4eSAndroid Build Coastguard Worker 
16*27162e4eSAndroid Build Coastguard Worker   You should have received a copy of the GNU General Public License along
17*27162e4eSAndroid Build Coastguard Worker   with this program; if not, write to the Free Software Foundation, Inc.,
18*27162e4eSAndroid Build Coastguard Worker   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19*27162e4eSAndroid Build Coastguard Worker 
20*27162e4eSAndroid Build Coastguard Worker   You can contact the author at :
21*27162e4eSAndroid Build Coastguard Worker   - LZ4 source repository : https://github.com/lz4/lz4
22*27162e4eSAndroid Build Coastguard Worker   - LZ4 public forum : https://groups.google.com/forum/#!forum/lz4c
23*27162e4eSAndroid Build Coastguard Worker */
24*27162e4eSAndroid Build Coastguard Worker 
25*27162e4eSAndroid Build Coastguard Worker 
26*27162e4eSAndroid Build Coastguard Worker /* ======   Dependencies   ======= */
27*27162e4eSAndroid Build Coastguard Worker #include <assert.h>
28*27162e4eSAndroid Build Coastguard Worker #include "lz4conf.h"  /* LZ4IO_MULTITHREAD */
29*27162e4eSAndroid Build Coastguard Worker #include "threadpool.h"
30*27162e4eSAndroid Build Coastguard Worker 
31*27162e4eSAndroid Build Coastguard Worker 
32*27162e4eSAndroid Build Coastguard Worker /* ======   Compiler specifics   ====== */
33*27162e4eSAndroid Build Coastguard Worker #if defined(_MSC_VER)
34*27162e4eSAndroid Build Coastguard Worker #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
35*27162e4eSAndroid Build Coastguard Worker #endif
36*27162e4eSAndroid Build Coastguard Worker 
37*27162e4eSAndroid Build Coastguard Worker #if !LZ4IO_MULTITHREAD
38*27162e4eSAndroid Build Coastguard Worker 
39*27162e4eSAndroid Build Coastguard Worker /* ===================================================== */
40*27162e4eSAndroid Build Coastguard Worker /* Backup implementation with no multi-threading support */
41*27162e4eSAndroid Build Coastguard Worker /* ===================================================== */
42*27162e4eSAndroid Build Coastguard Worker 
43*27162e4eSAndroid Build Coastguard Worker /* Non-zero size, to ensure g_poolCtx != NULL */
44*27162e4eSAndroid Build Coastguard Worker struct TPool_s {
45*27162e4eSAndroid Build Coastguard Worker     int dummy;
46*27162e4eSAndroid Build Coastguard Worker };
47*27162e4eSAndroid Build Coastguard Worker static TPool g_poolCtx;
48*27162e4eSAndroid Build Coastguard Worker 
TPool_create(int numThreads,int queueSize)49*27162e4eSAndroid Build Coastguard Worker TPool* TPool_create(int numThreads, int queueSize) {
50*27162e4eSAndroid Build Coastguard Worker     (void)numThreads;
51*27162e4eSAndroid Build Coastguard Worker     (void)queueSize;
52*27162e4eSAndroid Build Coastguard Worker     return &g_poolCtx;
53*27162e4eSAndroid Build Coastguard Worker }
54*27162e4eSAndroid Build Coastguard Worker 
TPool_free(TPool * ctx)55*27162e4eSAndroid Build Coastguard Worker void TPool_free(TPool* ctx) {
56*27162e4eSAndroid Build Coastguard Worker     assert(!ctx || ctx == &g_poolCtx);
57*27162e4eSAndroid Build Coastguard Worker     (void)ctx;
58*27162e4eSAndroid Build Coastguard Worker }
59*27162e4eSAndroid Build Coastguard Worker 
TPool_submitJob(TPool * ctx,void (* job_function)(void *),void * arg)60*27162e4eSAndroid Build Coastguard Worker void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg) {
61*27162e4eSAndroid Build Coastguard Worker     (void)ctx;
62*27162e4eSAndroid Build Coastguard Worker     job_function(arg);
63*27162e4eSAndroid Build Coastguard Worker }
64*27162e4eSAndroid Build Coastguard Worker 
TPool_jobsCompleted(TPool * ctx)65*27162e4eSAndroid Build Coastguard Worker void TPool_jobsCompleted(TPool* ctx) {
66*27162e4eSAndroid Build Coastguard Worker     assert(!ctx || ctx == &g_poolCtx);
67*27162e4eSAndroid Build Coastguard Worker     (void)ctx;
68*27162e4eSAndroid Build Coastguard Worker }
69*27162e4eSAndroid Build Coastguard Worker 
70*27162e4eSAndroid Build Coastguard Worker 
71*27162e4eSAndroid Build Coastguard Worker #elif defined(_WIN32)
72*27162e4eSAndroid Build Coastguard Worker 
73*27162e4eSAndroid Build Coastguard Worker /* Window TPool implementation using Completion Ports */
74*27162e4eSAndroid Build Coastguard Worker #include <windows.h>
75*27162e4eSAndroid Build Coastguard Worker 
76*27162e4eSAndroid Build Coastguard Worker typedef struct TPool_s {
77*27162e4eSAndroid Build Coastguard Worker     HANDLE completionPort;
78*27162e4eSAndroid Build Coastguard Worker     HANDLE* workerThreads;
79*27162e4eSAndroid Build Coastguard Worker     int nbWorkers;
80*27162e4eSAndroid Build Coastguard Worker     int queueSize;
81*27162e4eSAndroid Build Coastguard Worker     LONG nbPendingJobs;
82*27162e4eSAndroid Build Coastguard Worker     HANDLE jobSlotAvail;  /* For queue size control */
83*27162e4eSAndroid Build Coastguard Worker     HANDLE allJobsCompleted; /* Event */
84*27162e4eSAndroid Build Coastguard Worker } TPool;
85*27162e4eSAndroid Build Coastguard Worker 
TPool_free(TPool * pool)86*27162e4eSAndroid Build Coastguard Worker void TPool_free(TPool* pool)
87*27162e4eSAndroid Build Coastguard Worker {
88*27162e4eSAndroid Build Coastguard Worker     if (!pool) return;
89*27162e4eSAndroid Build Coastguard Worker 
90*27162e4eSAndroid Build Coastguard Worker     /* Signal workers to exit by posting NULL completions */
91*27162e4eSAndroid Build Coastguard Worker     {   int i;
92*27162e4eSAndroid Build Coastguard Worker         for (i = 0; i < pool->nbWorkers; i++) {
93*27162e4eSAndroid Build Coastguard Worker             PostQueuedCompletionStatus(pool->completionPort, 0, 0, NULL);
94*27162e4eSAndroid Build Coastguard Worker         }
95*27162e4eSAndroid Build Coastguard Worker     }
96*27162e4eSAndroid Build Coastguard Worker 
97*27162e4eSAndroid Build Coastguard Worker     /* Wait for worker threads to finish */
98*27162e4eSAndroid Build Coastguard Worker     WaitForMultipleObjects(pool->nbWorkers, pool->workerThreads, TRUE, INFINITE);
99*27162e4eSAndroid Build Coastguard Worker 
100*27162e4eSAndroid Build Coastguard Worker     /* Close thread handles and completion port */
101*27162e4eSAndroid Build Coastguard Worker     {   int i;
102*27162e4eSAndroid Build Coastguard Worker         for (i = 0; i < pool->nbWorkers; i++) {
103*27162e4eSAndroid Build Coastguard Worker             CloseHandle(pool->workerThreads[i]);
104*27162e4eSAndroid Build Coastguard Worker         }
105*27162e4eSAndroid Build Coastguard Worker     }
106*27162e4eSAndroid Build Coastguard Worker     free(pool->workerThreads);
107*27162e4eSAndroid Build Coastguard Worker     CloseHandle(pool->completionPort);
108*27162e4eSAndroid Build Coastguard Worker 
109*27162e4eSAndroid Build Coastguard Worker     /* Clean up synchronization objects */
110*27162e4eSAndroid Build Coastguard Worker     CloseHandle(pool->jobSlotAvail);
111*27162e4eSAndroid Build Coastguard Worker     CloseHandle(pool->allJobsCompleted);
112*27162e4eSAndroid Build Coastguard Worker 
113*27162e4eSAndroid Build Coastguard Worker     free(pool);
114*27162e4eSAndroid Build Coastguard Worker }
115*27162e4eSAndroid Build Coastguard Worker 
WorkerThread(LPVOID lpParameter)116*27162e4eSAndroid Build Coastguard Worker static DWORD WINAPI WorkerThread(LPVOID lpParameter)
117*27162e4eSAndroid Build Coastguard Worker {
118*27162e4eSAndroid Build Coastguard Worker     TPool* const pool = (TPool*)lpParameter;
119*27162e4eSAndroid Build Coastguard Worker     DWORD bytesTransferred;
120*27162e4eSAndroid Build Coastguard Worker     ULONG_PTR completionKey;
121*27162e4eSAndroid Build Coastguard Worker     LPOVERLAPPED overlapped;
122*27162e4eSAndroid Build Coastguard Worker 
123*27162e4eSAndroid Build Coastguard Worker     while (GetQueuedCompletionStatus(pool->completionPort,
124*27162e4eSAndroid Build Coastguard Worker                                     &bytesTransferred, &completionKey,
125*27162e4eSAndroid Build Coastguard Worker                                     &overlapped, INFINITE)) {
126*27162e4eSAndroid Build Coastguard Worker 
127*27162e4eSAndroid Build Coastguard Worker         /* End signal */
128*27162e4eSAndroid Build Coastguard Worker         if (overlapped == NULL) { break; }
129*27162e4eSAndroid Build Coastguard Worker 
130*27162e4eSAndroid Build Coastguard Worker         /* Execute job */
131*27162e4eSAndroid Build Coastguard Worker         ((void (*)(void*))completionKey)(overlapped);
132*27162e4eSAndroid Build Coastguard Worker 
133*27162e4eSAndroid Build Coastguard Worker         /* Signal job completion */
134*27162e4eSAndroid Build Coastguard Worker         if (InterlockedDecrement(&pool->nbPendingJobs) == 0) {
135*27162e4eSAndroid Build Coastguard Worker             SetEvent(pool->allJobsCompleted);
136*27162e4eSAndroid Build Coastguard Worker         }
137*27162e4eSAndroid Build Coastguard Worker         ReleaseSemaphore(pool->jobSlotAvail, 1, NULL);
138*27162e4eSAndroid Build Coastguard Worker     }
139*27162e4eSAndroid Build Coastguard Worker 
140*27162e4eSAndroid Build Coastguard Worker     return 0;
141*27162e4eSAndroid Build Coastguard Worker }
142*27162e4eSAndroid Build Coastguard Worker 
TPool_create(int nbWorkers,int queueSize)143*27162e4eSAndroid Build Coastguard Worker TPool* TPool_create(int nbWorkers, int queueSize)
144*27162e4eSAndroid Build Coastguard Worker {
145*27162e4eSAndroid Build Coastguard Worker     TPool* pool;
146*27162e4eSAndroid Build Coastguard Worker 
147*27162e4eSAndroid Build Coastguard Worker     /* parameters sanitization */
148*27162e4eSAndroid Build Coastguard Worker     if (nbWorkers <= 0 || queueSize <= 0) return NULL;
149*27162e4eSAndroid Build Coastguard Worker     if (nbWorkers>LZ4_NBWORKERS_MAX) nbWorkers=LZ4_NBWORKERS_MAX;
150*27162e4eSAndroid Build Coastguard Worker 
151*27162e4eSAndroid Build Coastguard Worker     pool = calloc(1, sizeof(TPool));
152*27162e4eSAndroid Build Coastguard Worker     if (!pool) return NULL;
153*27162e4eSAndroid Build Coastguard Worker 
154*27162e4eSAndroid Build Coastguard Worker     /* Create completion port */
155*27162e4eSAndroid Build Coastguard Worker     pool->completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nbWorkers);
156*27162e4eSAndroid Build Coastguard Worker     if (!pool->completionPort) { goto _cleanup; }
157*27162e4eSAndroid Build Coastguard Worker 
158*27162e4eSAndroid Build Coastguard Worker     /* Create worker threads */
159*27162e4eSAndroid Build Coastguard Worker     pool->nbWorkers = nbWorkers;
160*27162e4eSAndroid Build Coastguard Worker     pool->workerThreads = (HANDLE*)malloc(sizeof(HANDLE) * nbWorkers);
161*27162e4eSAndroid Build Coastguard Worker     if (pool->workerThreads == NULL) { goto _cleanup; }
162*27162e4eSAndroid Build Coastguard Worker 
163*27162e4eSAndroid Build Coastguard Worker     {   int i;
164*27162e4eSAndroid Build Coastguard Worker         for (i = 0; i < nbWorkers; i++) {
165*27162e4eSAndroid Build Coastguard Worker             pool->workerThreads[i] = CreateThread(NULL, 0, WorkerThread, pool, 0, NULL);
166*27162e4eSAndroid Build Coastguard Worker             if (!pool->workerThreads[i]) { goto _cleanup; }
167*27162e4eSAndroid Build Coastguard Worker         }
168*27162e4eSAndroid Build Coastguard Worker     }
169*27162e4eSAndroid Build Coastguard Worker 
170*27162e4eSAndroid Build Coastguard Worker     /* Initialize sync objects members */
171*27162e4eSAndroid Build Coastguard Worker     pool->queueSize = queueSize;
172*27162e4eSAndroid Build Coastguard Worker     pool->nbPendingJobs = 0;
173*27162e4eSAndroid Build Coastguard Worker 
174*27162e4eSAndroid Build Coastguard Worker     pool->jobSlotAvail = CreateSemaphore(NULL, queueSize+nbWorkers, queueSize+nbWorkers, NULL);
175*27162e4eSAndroid Build Coastguard Worker     if (!pool->jobSlotAvail) { goto _cleanup; }
176*27162e4eSAndroid Build Coastguard Worker 
177*27162e4eSAndroid Build Coastguard Worker     pool->allJobsCompleted = CreateEvent(NULL, FALSE, FALSE, NULL);
178*27162e4eSAndroid Build Coastguard Worker     if (!pool->allJobsCompleted) { goto _cleanup; }
179*27162e4eSAndroid Build Coastguard Worker 
180*27162e4eSAndroid Build Coastguard Worker     return pool;
181*27162e4eSAndroid Build Coastguard Worker 
182*27162e4eSAndroid Build Coastguard Worker _cleanup:
183*27162e4eSAndroid Build Coastguard Worker     TPool_free(pool);
184*27162e4eSAndroid Build Coastguard Worker     return NULL;
185*27162e4eSAndroid Build Coastguard Worker }
186*27162e4eSAndroid Build Coastguard Worker 
187*27162e4eSAndroid Build Coastguard Worker 
TPool_submitJob(TPool * pool,void (* job_function)(void *),void * arg)188*27162e4eSAndroid Build Coastguard Worker void TPool_submitJob(TPool* pool, void (*job_function)(void*), void* arg)
189*27162e4eSAndroid Build Coastguard Worker {
190*27162e4eSAndroid Build Coastguard Worker     assert(pool);
191*27162e4eSAndroid Build Coastguard Worker 
192*27162e4eSAndroid Build Coastguard Worker     /* Atomically increment pending jobs and check for overflow */
193*27162e4eSAndroid Build Coastguard Worker     WaitForSingleObject(pool->jobSlotAvail, INFINITE);
194*27162e4eSAndroid Build Coastguard Worker     ResetEvent(pool->allJobsCompleted);
195*27162e4eSAndroid Build Coastguard Worker     InterlockedIncrement(&pool->nbPendingJobs);
196*27162e4eSAndroid Build Coastguard Worker 
197*27162e4eSAndroid Build Coastguard Worker     /* Post the job directly to the completion port */
198*27162e4eSAndroid Build Coastguard Worker     PostQueuedCompletionStatus(pool->completionPort,
199*27162e4eSAndroid Build Coastguard Worker                                0, /* Bytes transferred not used */
200*27162e4eSAndroid Build Coastguard Worker                                (ULONG_PTR)job_function, /* Store function pointer in completionKey */
201*27162e4eSAndroid Build Coastguard Worker                                (LPOVERLAPPED)arg);      /* Store argument in overlapped */
202*27162e4eSAndroid Build Coastguard Worker }
203*27162e4eSAndroid Build Coastguard Worker 
TPool_jobsCompleted(TPool * pool)204*27162e4eSAndroid Build Coastguard Worker void TPool_jobsCompleted(TPool* pool)
205*27162e4eSAndroid Build Coastguard Worker {
206*27162e4eSAndroid Build Coastguard Worker     assert(pool);
207*27162e4eSAndroid Build Coastguard Worker     WaitForSingleObject(pool->allJobsCompleted, INFINITE);
208*27162e4eSAndroid Build Coastguard Worker }
209*27162e4eSAndroid Build Coastguard Worker 
210*27162e4eSAndroid Build Coastguard Worker #else
211*27162e4eSAndroid Build Coastguard Worker 
212*27162e4eSAndroid Build Coastguard Worker /* pthread availability assumed */
213*27162e4eSAndroid Build Coastguard Worker #include <stdlib.h>  /* malloc, free */
214*27162e4eSAndroid Build Coastguard Worker #include <pthread.h> /* pthread_* */
215*27162e4eSAndroid Build Coastguard Worker 
216*27162e4eSAndroid Build Coastguard Worker /* A job is just a function with an opaque argument */
217*27162e4eSAndroid Build Coastguard Worker typedef struct TPool_job_s {
218*27162e4eSAndroid Build Coastguard Worker     void (*job_function)(void*);
219*27162e4eSAndroid Build Coastguard Worker     void *arg;
220*27162e4eSAndroid Build Coastguard Worker } TPool_job;
221*27162e4eSAndroid Build Coastguard Worker 
222*27162e4eSAndroid Build Coastguard Worker struct TPool_s {
223*27162e4eSAndroid Build Coastguard Worker     pthread_t* threads;
224*27162e4eSAndroid Build Coastguard Worker     size_t threadCapacity;
225*27162e4eSAndroid Build Coastguard Worker     size_t threadLimit;
226*27162e4eSAndroid Build Coastguard Worker 
227*27162e4eSAndroid Build Coastguard Worker     /* The queue is a circular buffer */
228*27162e4eSAndroid Build Coastguard Worker     TPool_job* queue;
229*27162e4eSAndroid Build Coastguard Worker     size_t queueHead;
230*27162e4eSAndroid Build Coastguard Worker     size_t queueTail;
231*27162e4eSAndroid Build Coastguard Worker     size_t queueSize;
232*27162e4eSAndroid Build Coastguard Worker 
233*27162e4eSAndroid Build Coastguard Worker     /* The number of threads working on jobs */
234*27162e4eSAndroid Build Coastguard Worker     size_t numThreadsBusy;
235*27162e4eSAndroid Build Coastguard Worker     /* Indicates if the queue is empty */
236*27162e4eSAndroid Build Coastguard Worker     int queueEmpty;
237*27162e4eSAndroid Build Coastguard Worker 
238*27162e4eSAndroid Build Coastguard Worker     /* The mutex protects the queue */
239*27162e4eSAndroid Build Coastguard Worker     pthread_mutex_t queueMutex;
240*27162e4eSAndroid Build Coastguard Worker     /* Condition variable for pushers to wait on when the queue is full */
241*27162e4eSAndroid Build Coastguard Worker     pthread_cond_t queuePushCond;
242*27162e4eSAndroid Build Coastguard Worker     /* Condition variables for poppers to wait on when the queue is empty */
243*27162e4eSAndroid Build Coastguard Worker     pthread_cond_t queuePopCond;
244*27162e4eSAndroid Build Coastguard Worker     /* Indicates if the queue is shutting down */
245*27162e4eSAndroid Build Coastguard Worker     int shutdown;
246*27162e4eSAndroid Build Coastguard Worker };
247*27162e4eSAndroid Build Coastguard Worker 
248*27162e4eSAndroid Build Coastguard Worker static void TPool_shutdown(TPool* ctx);
249*27162e4eSAndroid Build Coastguard Worker 
TPool_free(TPool * ctx)250*27162e4eSAndroid Build Coastguard Worker void TPool_free(TPool* ctx) {
251*27162e4eSAndroid Build Coastguard Worker     if (!ctx) { return; }
252*27162e4eSAndroid Build Coastguard Worker     TPool_shutdown(ctx);
253*27162e4eSAndroid Build Coastguard Worker     pthread_mutex_destroy(&ctx->queueMutex);
254*27162e4eSAndroid Build Coastguard Worker     pthread_cond_destroy(&ctx->queuePushCond);
255*27162e4eSAndroid Build Coastguard Worker     pthread_cond_destroy(&ctx->queuePopCond);
256*27162e4eSAndroid Build Coastguard Worker     free(ctx->queue);
257*27162e4eSAndroid Build Coastguard Worker     free(ctx->threads);
258*27162e4eSAndroid Build Coastguard Worker     free(ctx);
259*27162e4eSAndroid Build Coastguard Worker }
260*27162e4eSAndroid Build Coastguard Worker 
261*27162e4eSAndroid Build Coastguard Worker static void* TPool_thread(void* opaque);
262*27162e4eSAndroid Build Coastguard Worker 
TPool_create(int nbThreads,int queueSize)263*27162e4eSAndroid Build Coastguard Worker TPool* TPool_create(int nbThreads, int queueSize)
264*27162e4eSAndroid Build Coastguard Worker {
265*27162e4eSAndroid Build Coastguard Worker     TPool* ctx;
266*27162e4eSAndroid Build Coastguard Worker     /* Check parameters */
267*27162e4eSAndroid Build Coastguard Worker     if (nbThreads<1 || queueSize<1) { return NULL; }
268*27162e4eSAndroid Build Coastguard Worker     /* Allocate the context and zero initialize */
269*27162e4eSAndroid Build Coastguard Worker     ctx = (TPool*)calloc(1, sizeof(TPool));
270*27162e4eSAndroid Build Coastguard Worker     if (!ctx) { return NULL; }
271*27162e4eSAndroid Build Coastguard Worker     /* init pthread variables */
272*27162e4eSAndroid Build Coastguard Worker     {   int error = 0;
273*27162e4eSAndroid Build Coastguard Worker         error |= pthread_mutex_init(&ctx->queueMutex, NULL);
274*27162e4eSAndroid Build Coastguard Worker         error |= pthread_cond_init(&ctx->queuePushCond, NULL);
275*27162e4eSAndroid Build Coastguard Worker         error |= pthread_cond_init(&ctx->queuePopCond, NULL);
276*27162e4eSAndroid Build Coastguard Worker         if (error) { TPool_free(ctx); return NULL; }
277*27162e4eSAndroid Build Coastguard Worker     }
278*27162e4eSAndroid Build Coastguard Worker     /* Initialize the job queue.
279*27162e4eSAndroid Build Coastguard Worker      * It needs one extra space since one space is wasted to differentiate
280*27162e4eSAndroid Build Coastguard Worker      * empty and full queues.
281*27162e4eSAndroid Build Coastguard Worker      */
282*27162e4eSAndroid Build Coastguard Worker     ctx->queueSize = (size_t)queueSize + 1;
283*27162e4eSAndroid Build Coastguard Worker     ctx->queue = (TPool_job*)calloc(1, ctx->queueSize * sizeof(TPool_job));
284*27162e4eSAndroid Build Coastguard Worker     if (ctx->queue == NULL) {
285*27162e4eSAndroid Build Coastguard Worker         TPool_free(ctx);
286*27162e4eSAndroid Build Coastguard Worker         return NULL;
287*27162e4eSAndroid Build Coastguard Worker     }
288*27162e4eSAndroid Build Coastguard Worker     ctx->queueHead = 0;
289*27162e4eSAndroid Build Coastguard Worker     ctx->queueTail = 0;
290*27162e4eSAndroid Build Coastguard Worker     ctx->numThreadsBusy = 0;
291*27162e4eSAndroid Build Coastguard Worker     ctx->queueEmpty = 1;
292*27162e4eSAndroid Build Coastguard Worker     ctx->shutdown = 0;
293*27162e4eSAndroid Build Coastguard Worker     /* Allocate space for the thread handles */
294*27162e4eSAndroid Build Coastguard Worker     ctx->threads = (pthread_t*)calloc(1, (size_t)nbThreads * sizeof(pthread_t));
295*27162e4eSAndroid Build Coastguard Worker     if (ctx->threads == NULL) {
296*27162e4eSAndroid Build Coastguard Worker         TPool_free(ctx);
297*27162e4eSAndroid Build Coastguard Worker         return NULL;
298*27162e4eSAndroid Build Coastguard Worker     }
299*27162e4eSAndroid Build Coastguard Worker     ctx->threadCapacity = 0;
300*27162e4eSAndroid Build Coastguard Worker     /* Initialize the threads */
301*27162e4eSAndroid Build Coastguard Worker     {   int i;
302*27162e4eSAndroid Build Coastguard Worker         for (i = 0; i < nbThreads; ++i) {
303*27162e4eSAndroid Build Coastguard Worker             if (pthread_create(&ctx->threads[i], NULL, &TPool_thread, ctx)) {
304*27162e4eSAndroid Build Coastguard Worker                 ctx->threadCapacity = (size_t)i;
305*27162e4eSAndroid Build Coastguard Worker                 TPool_free(ctx);
306*27162e4eSAndroid Build Coastguard Worker                 return NULL;
307*27162e4eSAndroid Build Coastguard Worker         }   }
308*27162e4eSAndroid Build Coastguard Worker         ctx->threadCapacity = (size_t)nbThreads;
309*27162e4eSAndroid Build Coastguard Worker         ctx->threadLimit = (size_t)nbThreads;
310*27162e4eSAndroid Build Coastguard Worker     }
311*27162e4eSAndroid Build Coastguard Worker     return ctx;
312*27162e4eSAndroid Build Coastguard Worker }
313*27162e4eSAndroid Build Coastguard Worker 
314*27162e4eSAndroid Build Coastguard Worker /* TPool_thread() :
315*27162e4eSAndroid Build Coastguard Worker  * Work thread for the thread pool.
316*27162e4eSAndroid Build Coastguard Worker  * Waits for jobs and executes them.
317*27162e4eSAndroid Build Coastguard Worker  * @returns : NULL on failure else non-null.
318*27162e4eSAndroid Build Coastguard Worker  */
TPool_thread(void * opaque)319*27162e4eSAndroid Build Coastguard Worker static void* TPool_thread(void* opaque) {
320*27162e4eSAndroid Build Coastguard Worker     TPool* const ctx = (TPool*)opaque;
321*27162e4eSAndroid Build Coastguard Worker     if (!ctx) { return NULL; }
322*27162e4eSAndroid Build Coastguard Worker     for (;;) {
323*27162e4eSAndroid Build Coastguard Worker         /* Lock the mutex and wait for a non-empty queue or until shutdown */
324*27162e4eSAndroid Build Coastguard Worker         pthread_mutex_lock(&ctx->queueMutex);
325*27162e4eSAndroid Build Coastguard Worker 
326*27162e4eSAndroid Build Coastguard Worker         while ( ctx->queueEmpty
327*27162e4eSAndroid Build Coastguard Worker             || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
328*27162e4eSAndroid Build Coastguard Worker             if (ctx->shutdown) {
329*27162e4eSAndroid Build Coastguard Worker                 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
330*27162e4eSAndroid Build Coastguard Worker                  * a few threads will be shutdown while !queueEmpty,
331*27162e4eSAndroid Build Coastguard Worker                  * but enough threads will remain active to finish the queue */
332*27162e4eSAndroid Build Coastguard Worker                 pthread_mutex_unlock(&ctx->queueMutex);
333*27162e4eSAndroid Build Coastguard Worker                 return opaque;
334*27162e4eSAndroid Build Coastguard Worker             }
335*27162e4eSAndroid Build Coastguard Worker             pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
336*27162e4eSAndroid Build Coastguard Worker         }
337*27162e4eSAndroid Build Coastguard Worker         /* Pop a job off the queue */
338*27162e4eSAndroid Build Coastguard Worker         {   TPool_job const job = ctx->queue[ctx->queueHead];
339*27162e4eSAndroid Build Coastguard Worker             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
340*27162e4eSAndroid Build Coastguard Worker             ctx->numThreadsBusy++;
341*27162e4eSAndroid Build Coastguard Worker             ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
342*27162e4eSAndroid Build Coastguard Worker             /* Unlock the mutex, signal a pusher, and run the job */
343*27162e4eSAndroid Build Coastguard Worker             pthread_cond_signal(&ctx->queuePushCond);
344*27162e4eSAndroid Build Coastguard Worker             pthread_mutex_unlock(&ctx->queueMutex);
345*27162e4eSAndroid Build Coastguard Worker 
346*27162e4eSAndroid Build Coastguard Worker             job.job_function(job.arg);
347*27162e4eSAndroid Build Coastguard Worker 
348*27162e4eSAndroid Build Coastguard Worker             /* If the intended queue size was 0, signal after finishing job */
349*27162e4eSAndroid Build Coastguard Worker             pthread_mutex_lock(&ctx->queueMutex);
350*27162e4eSAndroid Build Coastguard Worker             ctx->numThreadsBusy--;
351*27162e4eSAndroid Build Coastguard Worker             pthread_cond_signal(&ctx->queuePushCond);
352*27162e4eSAndroid Build Coastguard Worker             pthread_mutex_unlock(&ctx->queueMutex);
353*27162e4eSAndroid Build Coastguard Worker         }
354*27162e4eSAndroid Build Coastguard Worker     }  /* for (;;) */
355*27162e4eSAndroid Build Coastguard Worker     assert(0);  /* Unreachable */
356*27162e4eSAndroid Build Coastguard Worker }
357*27162e4eSAndroid Build Coastguard Worker 
358*27162e4eSAndroid Build Coastguard Worker /*! TPool_shutdown() :
359*27162e4eSAndroid Build Coastguard Worker     Shutdown the queue, wake any sleeping threads, and join all of the threads.
360*27162e4eSAndroid Build Coastguard Worker */
TPool_shutdown(TPool * ctx)361*27162e4eSAndroid Build Coastguard Worker static void TPool_shutdown(TPool* ctx) {
362*27162e4eSAndroid Build Coastguard Worker     /* Shut down the queue */
363*27162e4eSAndroid Build Coastguard Worker     pthread_mutex_lock(&ctx->queueMutex);
364*27162e4eSAndroid Build Coastguard Worker     ctx->shutdown = 1;
365*27162e4eSAndroid Build Coastguard Worker     pthread_mutex_unlock(&ctx->queueMutex);
366*27162e4eSAndroid Build Coastguard Worker     /* Wake up sleeping threads */
367*27162e4eSAndroid Build Coastguard Worker     pthread_cond_broadcast(&ctx->queuePushCond);
368*27162e4eSAndroid Build Coastguard Worker     pthread_cond_broadcast(&ctx->queuePopCond);
369*27162e4eSAndroid Build Coastguard Worker     /* Join all of the threads */
370*27162e4eSAndroid Build Coastguard Worker     {   size_t i;
371*27162e4eSAndroid Build Coastguard Worker         for (i = 0; i < ctx->threadCapacity; ++i) {
372*27162e4eSAndroid Build Coastguard Worker             pthread_join(ctx->threads[i], NULL);  /* note : could fail */
373*27162e4eSAndroid Build Coastguard Worker     }   }
374*27162e4eSAndroid Build Coastguard Worker }
375*27162e4eSAndroid Build Coastguard Worker 
376*27162e4eSAndroid Build Coastguard Worker 
377*27162e4eSAndroid Build Coastguard Worker /*! TPool_jobsCompleted() :
378*27162e4eSAndroid Build Coastguard Worker  *  Waits for all queued jobs to finish executing.
379*27162e4eSAndroid Build Coastguard Worker  */
TPool_jobsCompleted(TPool * ctx)380*27162e4eSAndroid Build Coastguard Worker void TPool_jobsCompleted(TPool* ctx){
381*27162e4eSAndroid Build Coastguard Worker     pthread_mutex_lock(&ctx->queueMutex);
382*27162e4eSAndroid Build Coastguard Worker     while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
383*27162e4eSAndroid Build Coastguard Worker         pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
384*27162e4eSAndroid Build Coastguard Worker     }
385*27162e4eSAndroid Build Coastguard Worker     pthread_mutex_unlock(&ctx->queueMutex);
386*27162e4eSAndroid Build Coastguard Worker }
387*27162e4eSAndroid Build Coastguard Worker 
388*27162e4eSAndroid Build Coastguard Worker /**
389*27162e4eSAndroid Build Coastguard Worker  * Returns 1 if the queue is full and 0 otherwise.
390*27162e4eSAndroid Build Coastguard Worker  *
391*27162e4eSAndroid Build Coastguard Worker  * When queueSize is 1 (pool was created with an intended queueSize of 0),
392*27162e4eSAndroid Build Coastguard Worker  * then a queue is empty if there is a thread free _and_ no job is waiting.
393*27162e4eSAndroid Build Coastguard Worker  */
isQueueFull(TPool const * ctx)394*27162e4eSAndroid Build Coastguard Worker static int isQueueFull(TPool const* ctx) {
395*27162e4eSAndroid Build Coastguard Worker     if (ctx->queueSize > 1) {
396*27162e4eSAndroid Build Coastguard Worker         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
397*27162e4eSAndroid Build Coastguard Worker     } else {
398*27162e4eSAndroid Build Coastguard Worker         return (ctx->numThreadsBusy == ctx->threadLimit) ||
399*27162e4eSAndroid Build Coastguard Worker                !ctx->queueEmpty;
400*27162e4eSAndroid Build Coastguard Worker     }
401*27162e4eSAndroid Build Coastguard Worker }
402*27162e4eSAndroid Build Coastguard Worker 
403*27162e4eSAndroid Build Coastguard Worker static void
TPool_submitJob_internal(TPool * ctx,void (* job_function)(void *),void * arg)404*27162e4eSAndroid Build Coastguard Worker TPool_submitJob_internal(TPool* ctx, void (*job_function)(void*), void *arg)
405*27162e4eSAndroid Build Coastguard Worker {
406*27162e4eSAndroid Build Coastguard Worker     TPool_job job;
407*27162e4eSAndroid Build Coastguard Worker     job.job_function = job_function;
408*27162e4eSAndroid Build Coastguard Worker     job.arg = arg;
409*27162e4eSAndroid Build Coastguard Worker     assert(ctx != NULL);
410*27162e4eSAndroid Build Coastguard Worker     if (ctx->shutdown) return;
411*27162e4eSAndroid Build Coastguard Worker 
412*27162e4eSAndroid Build Coastguard Worker     ctx->queueEmpty = 0;
413*27162e4eSAndroid Build Coastguard Worker     ctx->queue[ctx->queueTail] = job;
414*27162e4eSAndroid Build Coastguard Worker     ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
415*27162e4eSAndroid Build Coastguard Worker     pthread_cond_signal(&ctx->queuePopCond);
416*27162e4eSAndroid Build Coastguard Worker }
417*27162e4eSAndroid Build Coastguard Worker 
TPool_submitJob(TPool * ctx,void (* job_function)(void *),void * arg)418*27162e4eSAndroid Build Coastguard Worker void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg)
419*27162e4eSAndroid Build Coastguard Worker {
420*27162e4eSAndroid Build Coastguard Worker     assert(ctx != NULL);
421*27162e4eSAndroid Build Coastguard Worker     pthread_mutex_lock(&ctx->queueMutex);
422*27162e4eSAndroid Build Coastguard Worker     /* Wait until there is space in the queue for the new job */
423*27162e4eSAndroid Build Coastguard Worker     while (isQueueFull(ctx) && (!ctx->shutdown)) {
424*27162e4eSAndroid Build Coastguard Worker         pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
425*27162e4eSAndroid Build Coastguard Worker     }
426*27162e4eSAndroid Build Coastguard Worker     TPool_submitJob_internal(ctx, job_function, arg);
427*27162e4eSAndroid Build Coastguard Worker     pthread_mutex_unlock(&ctx->queueMutex);
428*27162e4eSAndroid Build Coastguard Worker }
429*27162e4eSAndroid Build Coastguard Worker 
430*27162e4eSAndroid Build Coastguard Worker #endif  /* LZ4IO_NO_MT */
431