1*27162e4eSAndroid Build Coastguard Worker /*
2*27162e4eSAndroid Build Coastguard Worker threadpool.h - part of lz4 project
3*27162e4eSAndroid Build Coastguard Worker Copyright (C) Yann Collet 2023
4*27162e4eSAndroid Build Coastguard Worker GPL v2 License
5*27162e4eSAndroid Build Coastguard Worker
6*27162e4eSAndroid Build Coastguard Worker This program is free software; you can redistribute it and/or modify
7*27162e4eSAndroid Build Coastguard Worker it under the terms of the GNU General Public License as published by
8*27162e4eSAndroid Build Coastguard Worker the Free Software Foundation; either version 2 of the License, or
9*27162e4eSAndroid Build Coastguard Worker (at your option) any later version.
10*27162e4eSAndroid Build Coastguard Worker
11*27162e4eSAndroid Build Coastguard Worker This program is distributed in the hope that it will be useful,
12*27162e4eSAndroid Build Coastguard Worker but WITHOUT ANY WARRANTY; without even the implied warranty of
13*27162e4eSAndroid Build Coastguard Worker MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14*27162e4eSAndroid Build Coastguard Worker GNU General Public License for more details.
15*27162e4eSAndroid Build Coastguard Worker
16*27162e4eSAndroid Build Coastguard Worker You should have received a copy of the GNU General Public License along
17*27162e4eSAndroid Build Coastguard Worker with this program; if not, write to the Free Software Foundation, Inc.,
18*27162e4eSAndroid Build Coastguard Worker 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19*27162e4eSAndroid Build Coastguard Worker
20*27162e4eSAndroid Build Coastguard Worker You can contact the author at :
21*27162e4eSAndroid Build Coastguard Worker - LZ4 source repository : https://github.com/lz4/lz4
22*27162e4eSAndroid Build Coastguard Worker - LZ4 public forum : https://groups.google.com/forum/#!forum/lz4c
23*27162e4eSAndroid Build Coastguard Worker */
24*27162e4eSAndroid Build Coastguard Worker
25*27162e4eSAndroid Build Coastguard Worker
26*27162e4eSAndroid Build Coastguard Worker /* ====== Dependencies ======= */
27*27162e4eSAndroid Build Coastguard Worker #include <assert.h>
28*27162e4eSAndroid Build Coastguard Worker #include "lz4conf.h" /* LZ4IO_MULTITHREAD */
29*27162e4eSAndroid Build Coastguard Worker #include "threadpool.h"
30*27162e4eSAndroid Build Coastguard Worker
31*27162e4eSAndroid Build Coastguard Worker
32*27162e4eSAndroid Build Coastguard Worker /* ====== Compiler specifics ====== */
33*27162e4eSAndroid Build Coastguard Worker #if defined(_MSC_VER)
34*27162e4eSAndroid Build Coastguard Worker # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
35*27162e4eSAndroid Build Coastguard Worker #endif
36*27162e4eSAndroid Build Coastguard Worker
37*27162e4eSAndroid Build Coastguard Worker #if !LZ4IO_MULTITHREAD
38*27162e4eSAndroid Build Coastguard Worker
39*27162e4eSAndroid Build Coastguard Worker /* ===================================================== */
40*27162e4eSAndroid Build Coastguard Worker /* Backup implementation with no multi-threading support */
41*27162e4eSAndroid Build Coastguard Worker /* ===================================================== */
42*27162e4eSAndroid Build Coastguard Worker
43*27162e4eSAndroid Build Coastguard Worker /* Non-zero size, to ensure g_poolCtx != NULL */
44*27162e4eSAndroid Build Coastguard Worker struct TPool_s {
45*27162e4eSAndroid Build Coastguard Worker int dummy;
46*27162e4eSAndroid Build Coastguard Worker };
47*27162e4eSAndroid Build Coastguard Worker static TPool g_poolCtx;
48*27162e4eSAndroid Build Coastguard Worker
TPool_create(int numThreads,int queueSize)49*27162e4eSAndroid Build Coastguard Worker TPool* TPool_create(int numThreads, int queueSize) {
50*27162e4eSAndroid Build Coastguard Worker (void)numThreads;
51*27162e4eSAndroid Build Coastguard Worker (void)queueSize;
52*27162e4eSAndroid Build Coastguard Worker return &g_poolCtx;
53*27162e4eSAndroid Build Coastguard Worker }
54*27162e4eSAndroid Build Coastguard Worker
TPool_free(TPool * ctx)55*27162e4eSAndroid Build Coastguard Worker void TPool_free(TPool* ctx) {
56*27162e4eSAndroid Build Coastguard Worker assert(!ctx || ctx == &g_poolCtx);
57*27162e4eSAndroid Build Coastguard Worker (void)ctx;
58*27162e4eSAndroid Build Coastguard Worker }
59*27162e4eSAndroid Build Coastguard Worker
TPool_submitJob(TPool * ctx,void (* job_function)(void *),void * arg)60*27162e4eSAndroid Build Coastguard Worker void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg) {
61*27162e4eSAndroid Build Coastguard Worker (void)ctx;
62*27162e4eSAndroid Build Coastguard Worker job_function(arg);
63*27162e4eSAndroid Build Coastguard Worker }
64*27162e4eSAndroid Build Coastguard Worker
TPool_jobsCompleted(TPool * ctx)65*27162e4eSAndroid Build Coastguard Worker void TPool_jobsCompleted(TPool* ctx) {
66*27162e4eSAndroid Build Coastguard Worker assert(!ctx || ctx == &g_poolCtx);
67*27162e4eSAndroid Build Coastguard Worker (void)ctx;
68*27162e4eSAndroid Build Coastguard Worker }
69*27162e4eSAndroid Build Coastguard Worker
70*27162e4eSAndroid Build Coastguard Worker
71*27162e4eSAndroid Build Coastguard Worker #elif defined(_WIN32)
72*27162e4eSAndroid Build Coastguard Worker
73*27162e4eSAndroid Build Coastguard Worker /* Window TPool implementation using Completion Ports */
74*27162e4eSAndroid Build Coastguard Worker #include <windows.h>
75*27162e4eSAndroid Build Coastguard Worker
76*27162e4eSAndroid Build Coastguard Worker typedef struct TPool_s {
77*27162e4eSAndroid Build Coastguard Worker HANDLE completionPort;
78*27162e4eSAndroid Build Coastguard Worker HANDLE* workerThreads;
79*27162e4eSAndroid Build Coastguard Worker int nbWorkers;
80*27162e4eSAndroid Build Coastguard Worker int queueSize;
81*27162e4eSAndroid Build Coastguard Worker LONG nbPendingJobs;
82*27162e4eSAndroid Build Coastguard Worker HANDLE jobSlotAvail; /* For queue size control */
83*27162e4eSAndroid Build Coastguard Worker HANDLE allJobsCompleted; /* Event */
84*27162e4eSAndroid Build Coastguard Worker } TPool;
85*27162e4eSAndroid Build Coastguard Worker
TPool_free(TPool * pool)86*27162e4eSAndroid Build Coastguard Worker void TPool_free(TPool* pool)
87*27162e4eSAndroid Build Coastguard Worker {
88*27162e4eSAndroid Build Coastguard Worker if (!pool) return;
89*27162e4eSAndroid Build Coastguard Worker
90*27162e4eSAndroid Build Coastguard Worker /* Signal workers to exit by posting NULL completions */
91*27162e4eSAndroid Build Coastguard Worker { int i;
92*27162e4eSAndroid Build Coastguard Worker for (i = 0; i < pool->nbWorkers; i++) {
93*27162e4eSAndroid Build Coastguard Worker PostQueuedCompletionStatus(pool->completionPort, 0, 0, NULL);
94*27162e4eSAndroid Build Coastguard Worker }
95*27162e4eSAndroid Build Coastguard Worker }
96*27162e4eSAndroid Build Coastguard Worker
97*27162e4eSAndroid Build Coastguard Worker /* Wait for worker threads to finish */
98*27162e4eSAndroid Build Coastguard Worker WaitForMultipleObjects(pool->nbWorkers, pool->workerThreads, TRUE, INFINITE);
99*27162e4eSAndroid Build Coastguard Worker
100*27162e4eSAndroid Build Coastguard Worker /* Close thread handles and completion port */
101*27162e4eSAndroid Build Coastguard Worker { int i;
102*27162e4eSAndroid Build Coastguard Worker for (i = 0; i < pool->nbWorkers; i++) {
103*27162e4eSAndroid Build Coastguard Worker CloseHandle(pool->workerThreads[i]);
104*27162e4eSAndroid Build Coastguard Worker }
105*27162e4eSAndroid Build Coastguard Worker }
106*27162e4eSAndroid Build Coastguard Worker free(pool->workerThreads);
107*27162e4eSAndroid Build Coastguard Worker CloseHandle(pool->completionPort);
108*27162e4eSAndroid Build Coastguard Worker
109*27162e4eSAndroid Build Coastguard Worker /* Clean up synchronization objects */
110*27162e4eSAndroid Build Coastguard Worker CloseHandle(pool->jobSlotAvail);
111*27162e4eSAndroid Build Coastguard Worker CloseHandle(pool->allJobsCompleted);
112*27162e4eSAndroid Build Coastguard Worker
113*27162e4eSAndroid Build Coastguard Worker free(pool);
114*27162e4eSAndroid Build Coastguard Worker }
115*27162e4eSAndroid Build Coastguard Worker
WorkerThread(LPVOID lpParameter)116*27162e4eSAndroid Build Coastguard Worker static DWORD WINAPI WorkerThread(LPVOID lpParameter)
117*27162e4eSAndroid Build Coastguard Worker {
118*27162e4eSAndroid Build Coastguard Worker TPool* const pool = (TPool*)lpParameter;
119*27162e4eSAndroid Build Coastguard Worker DWORD bytesTransferred;
120*27162e4eSAndroid Build Coastguard Worker ULONG_PTR completionKey;
121*27162e4eSAndroid Build Coastguard Worker LPOVERLAPPED overlapped;
122*27162e4eSAndroid Build Coastguard Worker
123*27162e4eSAndroid Build Coastguard Worker while (GetQueuedCompletionStatus(pool->completionPort,
124*27162e4eSAndroid Build Coastguard Worker &bytesTransferred, &completionKey,
125*27162e4eSAndroid Build Coastguard Worker &overlapped, INFINITE)) {
126*27162e4eSAndroid Build Coastguard Worker
127*27162e4eSAndroid Build Coastguard Worker /* End signal */
128*27162e4eSAndroid Build Coastguard Worker if (overlapped == NULL) { break; }
129*27162e4eSAndroid Build Coastguard Worker
130*27162e4eSAndroid Build Coastguard Worker /* Execute job */
131*27162e4eSAndroid Build Coastguard Worker ((void (*)(void*))completionKey)(overlapped);
132*27162e4eSAndroid Build Coastguard Worker
133*27162e4eSAndroid Build Coastguard Worker /* Signal job completion */
134*27162e4eSAndroid Build Coastguard Worker if (InterlockedDecrement(&pool->nbPendingJobs) == 0) {
135*27162e4eSAndroid Build Coastguard Worker SetEvent(pool->allJobsCompleted);
136*27162e4eSAndroid Build Coastguard Worker }
137*27162e4eSAndroid Build Coastguard Worker ReleaseSemaphore(pool->jobSlotAvail, 1, NULL);
138*27162e4eSAndroid Build Coastguard Worker }
139*27162e4eSAndroid Build Coastguard Worker
140*27162e4eSAndroid Build Coastguard Worker return 0;
141*27162e4eSAndroid Build Coastguard Worker }
142*27162e4eSAndroid Build Coastguard Worker
TPool_create(int nbWorkers,int queueSize)143*27162e4eSAndroid Build Coastguard Worker TPool* TPool_create(int nbWorkers, int queueSize)
144*27162e4eSAndroid Build Coastguard Worker {
145*27162e4eSAndroid Build Coastguard Worker TPool* pool;
146*27162e4eSAndroid Build Coastguard Worker
147*27162e4eSAndroid Build Coastguard Worker /* parameters sanitization */
148*27162e4eSAndroid Build Coastguard Worker if (nbWorkers <= 0 || queueSize <= 0) return NULL;
149*27162e4eSAndroid Build Coastguard Worker if (nbWorkers>LZ4_NBWORKERS_MAX) nbWorkers=LZ4_NBWORKERS_MAX;
150*27162e4eSAndroid Build Coastguard Worker
151*27162e4eSAndroid Build Coastguard Worker pool = calloc(1, sizeof(TPool));
152*27162e4eSAndroid Build Coastguard Worker if (!pool) return NULL;
153*27162e4eSAndroid Build Coastguard Worker
154*27162e4eSAndroid Build Coastguard Worker /* Create completion port */
155*27162e4eSAndroid Build Coastguard Worker pool->completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nbWorkers);
156*27162e4eSAndroid Build Coastguard Worker if (!pool->completionPort) { goto _cleanup; }
157*27162e4eSAndroid Build Coastguard Worker
158*27162e4eSAndroid Build Coastguard Worker /* Create worker threads */
159*27162e4eSAndroid Build Coastguard Worker pool->nbWorkers = nbWorkers;
160*27162e4eSAndroid Build Coastguard Worker pool->workerThreads = (HANDLE*)malloc(sizeof(HANDLE) * nbWorkers);
161*27162e4eSAndroid Build Coastguard Worker if (pool->workerThreads == NULL) { goto _cleanup; }
162*27162e4eSAndroid Build Coastguard Worker
163*27162e4eSAndroid Build Coastguard Worker { int i;
164*27162e4eSAndroid Build Coastguard Worker for (i = 0; i < nbWorkers; i++) {
165*27162e4eSAndroid Build Coastguard Worker pool->workerThreads[i] = CreateThread(NULL, 0, WorkerThread, pool, 0, NULL);
166*27162e4eSAndroid Build Coastguard Worker if (!pool->workerThreads[i]) { goto _cleanup; }
167*27162e4eSAndroid Build Coastguard Worker }
168*27162e4eSAndroid Build Coastguard Worker }
169*27162e4eSAndroid Build Coastguard Worker
170*27162e4eSAndroid Build Coastguard Worker /* Initialize sync objects members */
171*27162e4eSAndroid Build Coastguard Worker pool->queueSize = queueSize;
172*27162e4eSAndroid Build Coastguard Worker pool->nbPendingJobs = 0;
173*27162e4eSAndroid Build Coastguard Worker
174*27162e4eSAndroid Build Coastguard Worker pool->jobSlotAvail = CreateSemaphore(NULL, queueSize+nbWorkers, queueSize+nbWorkers, NULL);
175*27162e4eSAndroid Build Coastguard Worker if (!pool->jobSlotAvail) { goto _cleanup; }
176*27162e4eSAndroid Build Coastguard Worker
177*27162e4eSAndroid Build Coastguard Worker pool->allJobsCompleted = CreateEvent(NULL, FALSE, FALSE, NULL);
178*27162e4eSAndroid Build Coastguard Worker if (!pool->allJobsCompleted) { goto _cleanup; }
179*27162e4eSAndroid Build Coastguard Worker
180*27162e4eSAndroid Build Coastguard Worker return pool;
181*27162e4eSAndroid Build Coastguard Worker
182*27162e4eSAndroid Build Coastguard Worker _cleanup:
183*27162e4eSAndroid Build Coastguard Worker TPool_free(pool);
184*27162e4eSAndroid Build Coastguard Worker return NULL;
185*27162e4eSAndroid Build Coastguard Worker }
186*27162e4eSAndroid Build Coastguard Worker
187*27162e4eSAndroid Build Coastguard Worker
TPool_submitJob(TPool * pool,void (* job_function)(void *),void * arg)188*27162e4eSAndroid Build Coastguard Worker void TPool_submitJob(TPool* pool, void (*job_function)(void*), void* arg)
189*27162e4eSAndroid Build Coastguard Worker {
190*27162e4eSAndroid Build Coastguard Worker assert(pool);
191*27162e4eSAndroid Build Coastguard Worker
192*27162e4eSAndroid Build Coastguard Worker /* Atomically increment pending jobs and check for overflow */
193*27162e4eSAndroid Build Coastguard Worker WaitForSingleObject(pool->jobSlotAvail, INFINITE);
194*27162e4eSAndroid Build Coastguard Worker ResetEvent(pool->allJobsCompleted);
195*27162e4eSAndroid Build Coastguard Worker InterlockedIncrement(&pool->nbPendingJobs);
196*27162e4eSAndroid Build Coastguard Worker
197*27162e4eSAndroid Build Coastguard Worker /* Post the job directly to the completion port */
198*27162e4eSAndroid Build Coastguard Worker PostQueuedCompletionStatus(pool->completionPort,
199*27162e4eSAndroid Build Coastguard Worker 0, /* Bytes transferred not used */
200*27162e4eSAndroid Build Coastguard Worker (ULONG_PTR)job_function, /* Store function pointer in completionKey */
201*27162e4eSAndroid Build Coastguard Worker (LPOVERLAPPED)arg); /* Store argument in overlapped */
202*27162e4eSAndroid Build Coastguard Worker }
203*27162e4eSAndroid Build Coastguard Worker
TPool_jobsCompleted(TPool * pool)204*27162e4eSAndroid Build Coastguard Worker void TPool_jobsCompleted(TPool* pool)
205*27162e4eSAndroid Build Coastguard Worker {
206*27162e4eSAndroid Build Coastguard Worker assert(pool);
207*27162e4eSAndroid Build Coastguard Worker WaitForSingleObject(pool->allJobsCompleted, INFINITE);
208*27162e4eSAndroid Build Coastguard Worker }
209*27162e4eSAndroid Build Coastguard Worker
210*27162e4eSAndroid Build Coastguard Worker #else
211*27162e4eSAndroid Build Coastguard Worker
212*27162e4eSAndroid Build Coastguard Worker /* pthread availability assumed */
213*27162e4eSAndroid Build Coastguard Worker #include <stdlib.h> /* malloc, free */
214*27162e4eSAndroid Build Coastguard Worker #include <pthread.h> /* pthread_* */
215*27162e4eSAndroid Build Coastguard Worker
216*27162e4eSAndroid Build Coastguard Worker /* A job is just a function with an opaque argument */
217*27162e4eSAndroid Build Coastguard Worker typedef struct TPool_job_s {
218*27162e4eSAndroid Build Coastguard Worker void (*job_function)(void*);
219*27162e4eSAndroid Build Coastguard Worker void *arg;
220*27162e4eSAndroid Build Coastguard Worker } TPool_job;
221*27162e4eSAndroid Build Coastguard Worker
222*27162e4eSAndroid Build Coastguard Worker struct TPool_s {
223*27162e4eSAndroid Build Coastguard Worker pthread_t* threads;
224*27162e4eSAndroid Build Coastguard Worker size_t threadCapacity;
225*27162e4eSAndroid Build Coastguard Worker size_t threadLimit;
226*27162e4eSAndroid Build Coastguard Worker
227*27162e4eSAndroid Build Coastguard Worker /* The queue is a circular buffer */
228*27162e4eSAndroid Build Coastguard Worker TPool_job* queue;
229*27162e4eSAndroid Build Coastguard Worker size_t queueHead;
230*27162e4eSAndroid Build Coastguard Worker size_t queueTail;
231*27162e4eSAndroid Build Coastguard Worker size_t queueSize;
232*27162e4eSAndroid Build Coastguard Worker
233*27162e4eSAndroid Build Coastguard Worker /* The number of threads working on jobs */
234*27162e4eSAndroid Build Coastguard Worker size_t numThreadsBusy;
235*27162e4eSAndroid Build Coastguard Worker /* Indicates if the queue is empty */
236*27162e4eSAndroid Build Coastguard Worker int queueEmpty;
237*27162e4eSAndroid Build Coastguard Worker
238*27162e4eSAndroid Build Coastguard Worker /* The mutex protects the queue */
239*27162e4eSAndroid Build Coastguard Worker pthread_mutex_t queueMutex;
240*27162e4eSAndroid Build Coastguard Worker /* Condition variable for pushers to wait on when the queue is full */
241*27162e4eSAndroid Build Coastguard Worker pthread_cond_t queuePushCond;
242*27162e4eSAndroid Build Coastguard Worker /* Condition variables for poppers to wait on when the queue is empty */
243*27162e4eSAndroid Build Coastguard Worker pthread_cond_t queuePopCond;
244*27162e4eSAndroid Build Coastguard Worker /* Indicates if the queue is shutting down */
245*27162e4eSAndroid Build Coastguard Worker int shutdown;
246*27162e4eSAndroid Build Coastguard Worker };
247*27162e4eSAndroid Build Coastguard Worker
248*27162e4eSAndroid Build Coastguard Worker static void TPool_shutdown(TPool* ctx);
249*27162e4eSAndroid Build Coastguard Worker
TPool_free(TPool * ctx)250*27162e4eSAndroid Build Coastguard Worker void TPool_free(TPool* ctx) {
251*27162e4eSAndroid Build Coastguard Worker if (!ctx) { return; }
252*27162e4eSAndroid Build Coastguard Worker TPool_shutdown(ctx);
253*27162e4eSAndroid Build Coastguard Worker pthread_mutex_destroy(&ctx->queueMutex);
254*27162e4eSAndroid Build Coastguard Worker pthread_cond_destroy(&ctx->queuePushCond);
255*27162e4eSAndroid Build Coastguard Worker pthread_cond_destroy(&ctx->queuePopCond);
256*27162e4eSAndroid Build Coastguard Worker free(ctx->queue);
257*27162e4eSAndroid Build Coastguard Worker free(ctx->threads);
258*27162e4eSAndroid Build Coastguard Worker free(ctx);
259*27162e4eSAndroid Build Coastguard Worker }
260*27162e4eSAndroid Build Coastguard Worker
261*27162e4eSAndroid Build Coastguard Worker static void* TPool_thread(void* opaque);
262*27162e4eSAndroid Build Coastguard Worker
TPool_create(int nbThreads,int queueSize)263*27162e4eSAndroid Build Coastguard Worker TPool* TPool_create(int nbThreads, int queueSize)
264*27162e4eSAndroid Build Coastguard Worker {
265*27162e4eSAndroid Build Coastguard Worker TPool* ctx;
266*27162e4eSAndroid Build Coastguard Worker /* Check parameters */
267*27162e4eSAndroid Build Coastguard Worker if (nbThreads<1 || queueSize<1) { return NULL; }
268*27162e4eSAndroid Build Coastguard Worker /* Allocate the context and zero initialize */
269*27162e4eSAndroid Build Coastguard Worker ctx = (TPool*)calloc(1, sizeof(TPool));
270*27162e4eSAndroid Build Coastguard Worker if (!ctx) { return NULL; }
271*27162e4eSAndroid Build Coastguard Worker /* init pthread variables */
272*27162e4eSAndroid Build Coastguard Worker { int error = 0;
273*27162e4eSAndroid Build Coastguard Worker error |= pthread_mutex_init(&ctx->queueMutex, NULL);
274*27162e4eSAndroid Build Coastguard Worker error |= pthread_cond_init(&ctx->queuePushCond, NULL);
275*27162e4eSAndroid Build Coastguard Worker error |= pthread_cond_init(&ctx->queuePopCond, NULL);
276*27162e4eSAndroid Build Coastguard Worker if (error) { TPool_free(ctx); return NULL; }
277*27162e4eSAndroid Build Coastguard Worker }
278*27162e4eSAndroid Build Coastguard Worker /* Initialize the job queue.
279*27162e4eSAndroid Build Coastguard Worker * It needs one extra space since one space is wasted to differentiate
280*27162e4eSAndroid Build Coastguard Worker * empty and full queues.
281*27162e4eSAndroid Build Coastguard Worker */
282*27162e4eSAndroid Build Coastguard Worker ctx->queueSize = (size_t)queueSize + 1;
283*27162e4eSAndroid Build Coastguard Worker ctx->queue = (TPool_job*)calloc(1, ctx->queueSize * sizeof(TPool_job));
284*27162e4eSAndroid Build Coastguard Worker if (ctx->queue == NULL) {
285*27162e4eSAndroid Build Coastguard Worker TPool_free(ctx);
286*27162e4eSAndroid Build Coastguard Worker return NULL;
287*27162e4eSAndroid Build Coastguard Worker }
288*27162e4eSAndroid Build Coastguard Worker ctx->queueHead = 0;
289*27162e4eSAndroid Build Coastguard Worker ctx->queueTail = 0;
290*27162e4eSAndroid Build Coastguard Worker ctx->numThreadsBusy = 0;
291*27162e4eSAndroid Build Coastguard Worker ctx->queueEmpty = 1;
292*27162e4eSAndroid Build Coastguard Worker ctx->shutdown = 0;
293*27162e4eSAndroid Build Coastguard Worker /* Allocate space for the thread handles */
294*27162e4eSAndroid Build Coastguard Worker ctx->threads = (pthread_t*)calloc(1, (size_t)nbThreads * sizeof(pthread_t));
295*27162e4eSAndroid Build Coastguard Worker if (ctx->threads == NULL) {
296*27162e4eSAndroid Build Coastguard Worker TPool_free(ctx);
297*27162e4eSAndroid Build Coastguard Worker return NULL;
298*27162e4eSAndroid Build Coastguard Worker }
299*27162e4eSAndroid Build Coastguard Worker ctx->threadCapacity = 0;
300*27162e4eSAndroid Build Coastguard Worker /* Initialize the threads */
301*27162e4eSAndroid Build Coastguard Worker { int i;
302*27162e4eSAndroid Build Coastguard Worker for (i = 0; i < nbThreads; ++i) {
303*27162e4eSAndroid Build Coastguard Worker if (pthread_create(&ctx->threads[i], NULL, &TPool_thread, ctx)) {
304*27162e4eSAndroid Build Coastguard Worker ctx->threadCapacity = (size_t)i;
305*27162e4eSAndroid Build Coastguard Worker TPool_free(ctx);
306*27162e4eSAndroid Build Coastguard Worker return NULL;
307*27162e4eSAndroid Build Coastguard Worker } }
308*27162e4eSAndroid Build Coastguard Worker ctx->threadCapacity = (size_t)nbThreads;
309*27162e4eSAndroid Build Coastguard Worker ctx->threadLimit = (size_t)nbThreads;
310*27162e4eSAndroid Build Coastguard Worker }
311*27162e4eSAndroid Build Coastguard Worker return ctx;
312*27162e4eSAndroid Build Coastguard Worker }
313*27162e4eSAndroid Build Coastguard Worker
314*27162e4eSAndroid Build Coastguard Worker /* TPool_thread() :
315*27162e4eSAndroid Build Coastguard Worker * Work thread for the thread pool.
316*27162e4eSAndroid Build Coastguard Worker * Waits for jobs and executes them.
317*27162e4eSAndroid Build Coastguard Worker * @returns : NULL on failure else non-null.
318*27162e4eSAndroid Build Coastguard Worker */
TPool_thread(void * opaque)319*27162e4eSAndroid Build Coastguard Worker static void* TPool_thread(void* opaque) {
320*27162e4eSAndroid Build Coastguard Worker TPool* const ctx = (TPool*)opaque;
321*27162e4eSAndroid Build Coastguard Worker if (!ctx) { return NULL; }
322*27162e4eSAndroid Build Coastguard Worker for (;;) {
323*27162e4eSAndroid Build Coastguard Worker /* Lock the mutex and wait for a non-empty queue or until shutdown */
324*27162e4eSAndroid Build Coastguard Worker pthread_mutex_lock(&ctx->queueMutex);
325*27162e4eSAndroid Build Coastguard Worker
326*27162e4eSAndroid Build Coastguard Worker while ( ctx->queueEmpty
327*27162e4eSAndroid Build Coastguard Worker || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
328*27162e4eSAndroid Build Coastguard Worker if (ctx->shutdown) {
329*27162e4eSAndroid Build Coastguard Worker /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
330*27162e4eSAndroid Build Coastguard Worker * a few threads will be shutdown while !queueEmpty,
331*27162e4eSAndroid Build Coastguard Worker * but enough threads will remain active to finish the queue */
332*27162e4eSAndroid Build Coastguard Worker pthread_mutex_unlock(&ctx->queueMutex);
333*27162e4eSAndroid Build Coastguard Worker return opaque;
334*27162e4eSAndroid Build Coastguard Worker }
335*27162e4eSAndroid Build Coastguard Worker pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
336*27162e4eSAndroid Build Coastguard Worker }
337*27162e4eSAndroid Build Coastguard Worker /* Pop a job off the queue */
338*27162e4eSAndroid Build Coastguard Worker { TPool_job const job = ctx->queue[ctx->queueHead];
339*27162e4eSAndroid Build Coastguard Worker ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
340*27162e4eSAndroid Build Coastguard Worker ctx->numThreadsBusy++;
341*27162e4eSAndroid Build Coastguard Worker ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
342*27162e4eSAndroid Build Coastguard Worker /* Unlock the mutex, signal a pusher, and run the job */
343*27162e4eSAndroid Build Coastguard Worker pthread_cond_signal(&ctx->queuePushCond);
344*27162e4eSAndroid Build Coastguard Worker pthread_mutex_unlock(&ctx->queueMutex);
345*27162e4eSAndroid Build Coastguard Worker
346*27162e4eSAndroid Build Coastguard Worker job.job_function(job.arg);
347*27162e4eSAndroid Build Coastguard Worker
348*27162e4eSAndroid Build Coastguard Worker /* If the intended queue size was 0, signal after finishing job */
349*27162e4eSAndroid Build Coastguard Worker pthread_mutex_lock(&ctx->queueMutex);
350*27162e4eSAndroid Build Coastguard Worker ctx->numThreadsBusy--;
351*27162e4eSAndroid Build Coastguard Worker pthread_cond_signal(&ctx->queuePushCond);
352*27162e4eSAndroid Build Coastguard Worker pthread_mutex_unlock(&ctx->queueMutex);
353*27162e4eSAndroid Build Coastguard Worker }
354*27162e4eSAndroid Build Coastguard Worker } /* for (;;) */
355*27162e4eSAndroid Build Coastguard Worker assert(0); /* Unreachable */
356*27162e4eSAndroid Build Coastguard Worker }
357*27162e4eSAndroid Build Coastguard Worker
358*27162e4eSAndroid Build Coastguard Worker /*! TPool_shutdown() :
359*27162e4eSAndroid Build Coastguard Worker Shutdown the queue, wake any sleeping threads, and join all of the threads.
360*27162e4eSAndroid Build Coastguard Worker */
TPool_shutdown(TPool * ctx)361*27162e4eSAndroid Build Coastguard Worker static void TPool_shutdown(TPool* ctx) {
362*27162e4eSAndroid Build Coastguard Worker /* Shut down the queue */
363*27162e4eSAndroid Build Coastguard Worker pthread_mutex_lock(&ctx->queueMutex);
364*27162e4eSAndroid Build Coastguard Worker ctx->shutdown = 1;
365*27162e4eSAndroid Build Coastguard Worker pthread_mutex_unlock(&ctx->queueMutex);
366*27162e4eSAndroid Build Coastguard Worker /* Wake up sleeping threads */
367*27162e4eSAndroid Build Coastguard Worker pthread_cond_broadcast(&ctx->queuePushCond);
368*27162e4eSAndroid Build Coastguard Worker pthread_cond_broadcast(&ctx->queuePopCond);
369*27162e4eSAndroid Build Coastguard Worker /* Join all of the threads */
370*27162e4eSAndroid Build Coastguard Worker { size_t i;
371*27162e4eSAndroid Build Coastguard Worker for (i = 0; i < ctx->threadCapacity; ++i) {
372*27162e4eSAndroid Build Coastguard Worker pthread_join(ctx->threads[i], NULL); /* note : could fail */
373*27162e4eSAndroid Build Coastguard Worker } }
374*27162e4eSAndroid Build Coastguard Worker }
375*27162e4eSAndroid Build Coastguard Worker
376*27162e4eSAndroid Build Coastguard Worker
377*27162e4eSAndroid Build Coastguard Worker /*! TPool_jobsCompleted() :
378*27162e4eSAndroid Build Coastguard Worker * Waits for all queued jobs to finish executing.
379*27162e4eSAndroid Build Coastguard Worker */
TPool_jobsCompleted(TPool * ctx)380*27162e4eSAndroid Build Coastguard Worker void TPool_jobsCompleted(TPool* ctx){
381*27162e4eSAndroid Build Coastguard Worker pthread_mutex_lock(&ctx->queueMutex);
382*27162e4eSAndroid Build Coastguard Worker while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
383*27162e4eSAndroid Build Coastguard Worker pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
384*27162e4eSAndroid Build Coastguard Worker }
385*27162e4eSAndroid Build Coastguard Worker pthread_mutex_unlock(&ctx->queueMutex);
386*27162e4eSAndroid Build Coastguard Worker }
387*27162e4eSAndroid Build Coastguard Worker
388*27162e4eSAndroid Build Coastguard Worker /**
389*27162e4eSAndroid Build Coastguard Worker * Returns 1 if the queue is full and 0 otherwise.
390*27162e4eSAndroid Build Coastguard Worker *
391*27162e4eSAndroid Build Coastguard Worker * When queueSize is 1 (pool was created with an intended queueSize of 0),
392*27162e4eSAndroid Build Coastguard Worker * then a queue is empty if there is a thread free _and_ no job is waiting.
393*27162e4eSAndroid Build Coastguard Worker */
isQueueFull(TPool const * ctx)394*27162e4eSAndroid Build Coastguard Worker static int isQueueFull(TPool const* ctx) {
395*27162e4eSAndroid Build Coastguard Worker if (ctx->queueSize > 1) {
396*27162e4eSAndroid Build Coastguard Worker return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
397*27162e4eSAndroid Build Coastguard Worker } else {
398*27162e4eSAndroid Build Coastguard Worker return (ctx->numThreadsBusy == ctx->threadLimit) ||
399*27162e4eSAndroid Build Coastguard Worker !ctx->queueEmpty;
400*27162e4eSAndroid Build Coastguard Worker }
401*27162e4eSAndroid Build Coastguard Worker }
402*27162e4eSAndroid Build Coastguard Worker
403*27162e4eSAndroid Build Coastguard Worker static void
TPool_submitJob_internal(TPool * ctx,void (* job_function)(void *),void * arg)404*27162e4eSAndroid Build Coastguard Worker TPool_submitJob_internal(TPool* ctx, void (*job_function)(void*), void *arg)
405*27162e4eSAndroid Build Coastguard Worker {
406*27162e4eSAndroid Build Coastguard Worker TPool_job job;
407*27162e4eSAndroid Build Coastguard Worker job.job_function = job_function;
408*27162e4eSAndroid Build Coastguard Worker job.arg = arg;
409*27162e4eSAndroid Build Coastguard Worker assert(ctx != NULL);
410*27162e4eSAndroid Build Coastguard Worker if (ctx->shutdown) return;
411*27162e4eSAndroid Build Coastguard Worker
412*27162e4eSAndroid Build Coastguard Worker ctx->queueEmpty = 0;
413*27162e4eSAndroid Build Coastguard Worker ctx->queue[ctx->queueTail] = job;
414*27162e4eSAndroid Build Coastguard Worker ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
415*27162e4eSAndroid Build Coastguard Worker pthread_cond_signal(&ctx->queuePopCond);
416*27162e4eSAndroid Build Coastguard Worker }
417*27162e4eSAndroid Build Coastguard Worker
TPool_submitJob(TPool * ctx,void (* job_function)(void *),void * arg)418*27162e4eSAndroid Build Coastguard Worker void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg)
419*27162e4eSAndroid Build Coastguard Worker {
420*27162e4eSAndroid Build Coastguard Worker assert(ctx != NULL);
421*27162e4eSAndroid Build Coastguard Worker pthread_mutex_lock(&ctx->queueMutex);
422*27162e4eSAndroid Build Coastguard Worker /* Wait until there is space in the queue for the new job */
423*27162e4eSAndroid Build Coastguard Worker while (isQueueFull(ctx) && (!ctx->shutdown)) {
424*27162e4eSAndroid Build Coastguard Worker pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
425*27162e4eSAndroid Build Coastguard Worker }
426*27162e4eSAndroid Build Coastguard Worker TPool_submitJob_internal(ctx, job_function, arg);
427*27162e4eSAndroid Build Coastguard Worker pthread_mutex_unlock(&ctx->queueMutex);
428*27162e4eSAndroid Build Coastguard Worker }
429*27162e4eSAndroid Build Coastguard Worker
430*27162e4eSAndroid Build Coastguard Worker #endif /* LZ4IO_NO_MT */
431