1*01826a49SYabin Cui /*
2*01826a49SYabin Cui * Copyright (c) Meta Platforms, Inc. and affiliates.
3*01826a49SYabin Cui * All rights reserved.
4*01826a49SYabin Cui *
5*01826a49SYabin Cui * This source code is licensed under both the BSD-style license (found in the
6*01826a49SYabin Cui * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7*01826a49SYabin Cui * in the COPYING file in the root directory of this source tree).
8*01826a49SYabin Cui * You may select, at your option, one of the above-listed licenses.
9*01826a49SYabin Cui */
10*01826a49SYabin Cui
11*01826a49SYabin Cui
12*01826a49SYabin Cui #include "pool.h"
13*01826a49SYabin Cui #include "threading.h"
14*01826a49SYabin Cui #include "util.h"
15*01826a49SYabin Cui #include "timefn.h"
16*01826a49SYabin Cui #include <stddef.h>
17*01826a49SYabin Cui #include <stdio.h>
18*01826a49SYabin Cui
19*01826a49SYabin Cui #define ASSERT_TRUE(p) \
20*01826a49SYabin Cui do { \
21*01826a49SYabin Cui if (!(p)) { \
22*01826a49SYabin Cui return 1; \
23*01826a49SYabin Cui } \
24*01826a49SYabin Cui } while (0)
25*01826a49SYabin Cui #define ASSERT_FALSE(p) ASSERT_TRUE(!(p))
26*01826a49SYabin Cui #define ASSERT_EQ(lhs, rhs) ASSERT_TRUE((lhs) == (rhs))
27*01826a49SYabin Cui
28*01826a49SYabin Cui struct data {
29*01826a49SYabin Cui ZSTD_pthread_mutex_t mutex;
30*01826a49SYabin Cui unsigned data[16];
31*01826a49SYabin Cui size_t i;
32*01826a49SYabin Cui };
33*01826a49SYabin Cui
fn(void * opaque)34*01826a49SYabin Cui static void fn(void *opaque)
35*01826a49SYabin Cui {
36*01826a49SYabin Cui struct data *data = (struct data *)opaque;
37*01826a49SYabin Cui ZSTD_pthread_mutex_lock(&data->mutex);
38*01826a49SYabin Cui data->data[data->i] = (unsigned)(data->i);
39*01826a49SYabin Cui ++data->i;
40*01826a49SYabin Cui ZSTD_pthread_mutex_unlock(&data->mutex);
41*01826a49SYabin Cui }
42*01826a49SYabin Cui
testOrder(size_t numThreads,size_t queueSize)43*01826a49SYabin Cui static int testOrder(size_t numThreads, size_t queueSize)
44*01826a49SYabin Cui {
45*01826a49SYabin Cui struct data data;
46*01826a49SYabin Cui POOL_ctx* const ctx = POOL_create(numThreads, queueSize);
47*01826a49SYabin Cui ASSERT_TRUE(ctx);
48*01826a49SYabin Cui data.i = 0;
49*01826a49SYabin Cui ASSERT_FALSE(ZSTD_pthread_mutex_init(&data.mutex, NULL));
50*01826a49SYabin Cui { size_t i;
51*01826a49SYabin Cui for (i = 0; i < 16; ++i) {
52*01826a49SYabin Cui POOL_add(ctx, &fn, &data);
53*01826a49SYabin Cui }
54*01826a49SYabin Cui }
55*01826a49SYabin Cui POOL_free(ctx);
56*01826a49SYabin Cui ASSERT_EQ(16, data.i);
57*01826a49SYabin Cui { size_t i;
58*01826a49SYabin Cui for (i = 0; i < data.i; ++i) {
59*01826a49SYabin Cui ASSERT_EQ(i, data.data[i]);
60*01826a49SYabin Cui }
61*01826a49SYabin Cui }
62*01826a49SYabin Cui ZSTD_pthread_mutex_destroy(&data.mutex);
63*01826a49SYabin Cui return 0;
64*01826a49SYabin Cui }
65*01826a49SYabin Cui
66*01826a49SYabin Cui
67*01826a49SYabin Cui /* --- test deadlocks --- */
68*01826a49SYabin Cui
waitFn(void * opaque)69*01826a49SYabin Cui static void waitFn(void *opaque) {
70*01826a49SYabin Cui (void)opaque;
71*01826a49SYabin Cui UTIL_sleepMilli(1);
72*01826a49SYabin Cui }
73*01826a49SYabin Cui
74*01826a49SYabin Cui /* Tests for deadlock */
testWait(size_t numThreads,size_t queueSize)75*01826a49SYabin Cui static int testWait(size_t numThreads, size_t queueSize) {
76*01826a49SYabin Cui struct data data;
77*01826a49SYabin Cui POOL_ctx* const ctx = POOL_create(numThreads, queueSize);
78*01826a49SYabin Cui ASSERT_TRUE(ctx);
79*01826a49SYabin Cui { size_t i;
80*01826a49SYabin Cui for (i = 0; i < 16; ++i) {
81*01826a49SYabin Cui POOL_add(ctx, &waitFn, &data);
82*01826a49SYabin Cui }
83*01826a49SYabin Cui }
84*01826a49SYabin Cui POOL_free(ctx);
85*01826a49SYabin Cui return 0;
86*01826a49SYabin Cui }
87*01826a49SYabin Cui
88*01826a49SYabin Cui
89*01826a49SYabin Cui /* --- test POOL_resize() --- */
90*01826a49SYabin Cui
91*01826a49SYabin Cui typedef struct {
92*01826a49SYabin Cui ZSTD_pthread_mutex_t mut;
93*01826a49SYabin Cui int countdown;
94*01826a49SYabin Cui int val;
95*01826a49SYabin Cui int max;
96*01826a49SYabin Cui ZSTD_pthread_cond_t cond;
97*01826a49SYabin Cui } poolTest_t;
98*01826a49SYabin Cui
waitLongFn(void * opaque)99*01826a49SYabin Cui static void waitLongFn(void *opaque) {
100*01826a49SYabin Cui poolTest_t* const test = (poolTest_t*) opaque;
101*01826a49SYabin Cui ZSTD_pthread_mutex_lock(&test->mut);
102*01826a49SYabin Cui test->val++;
103*01826a49SYabin Cui if (test->val > test->max)
104*01826a49SYabin Cui test->max = test->val;
105*01826a49SYabin Cui ZSTD_pthread_mutex_unlock(&test->mut);
106*01826a49SYabin Cui
107*01826a49SYabin Cui UTIL_sleepMilli(10);
108*01826a49SYabin Cui
109*01826a49SYabin Cui ZSTD_pthread_mutex_lock(&test->mut);
110*01826a49SYabin Cui test->val--;
111*01826a49SYabin Cui test->countdown--;
112*01826a49SYabin Cui if (test->countdown == 0)
113*01826a49SYabin Cui ZSTD_pthread_cond_signal(&test->cond);
114*01826a49SYabin Cui ZSTD_pthread_mutex_unlock(&test->mut);
115*01826a49SYabin Cui }
116*01826a49SYabin Cui
testThreadReduction_internal(POOL_ctx * ctx,poolTest_t test)117*01826a49SYabin Cui static int testThreadReduction_internal(POOL_ctx* ctx, poolTest_t test)
118*01826a49SYabin Cui {
119*01826a49SYabin Cui int const nbWaits = 16;
120*01826a49SYabin Cui
121*01826a49SYabin Cui test.countdown = nbWaits;
122*01826a49SYabin Cui test.val = 0;
123*01826a49SYabin Cui test.max = 0;
124*01826a49SYabin Cui
125*01826a49SYabin Cui { int i;
126*01826a49SYabin Cui for (i=0; i<nbWaits; i++)
127*01826a49SYabin Cui POOL_add(ctx, &waitLongFn, &test);
128*01826a49SYabin Cui }
129*01826a49SYabin Cui ZSTD_pthread_mutex_lock(&test.mut);
130*01826a49SYabin Cui while (test.countdown > 0)
131*01826a49SYabin Cui ZSTD_pthread_cond_wait(&test.cond, &test.mut);
132*01826a49SYabin Cui ASSERT_EQ(test.val, 0);
133*01826a49SYabin Cui ASSERT_EQ(test.max, 4);
134*01826a49SYabin Cui ZSTD_pthread_mutex_unlock(&test.mut);
135*01826a49SYabin Cui
136*01826a49SYabin Cui ASSERT_EQ( POOL_resize(ctx, 2/*nbThreads*/) , 0 );
137*01826a49SYabin Cui test.countdown = nbWaits;
138*01826a49SYabin Cui test.val = 0;
139*01826a49SYabin Cui test.max = 0;
140*01826a49SYabin Cui { int i;
141*01826a49SYabin Cui for (i=0; i<nbWaits; i++)
142*01826a49SYabin Cui POOL_add(ctx, &waitLongFn, &test);
143*01826a49SYabin Cui }
144*01826a49SYabin Cui ZSTD_pthread_mutex_lock(&test.mut);
145*01826a49SYabin Cui while (test.countdown > 0)
146*01826a49SYabin Cui ZSTD_pthread_cond_wait(&test.cond, &test.mut);
147*01826a49SYabin Cui ASSERT_EQ(test.val, 0);
148*01826a49SYabin Cui ASSERT_EQ(test.max, 2);
149*01826a49SYabin Cui ZSTD_pthread_mutex_unlock(&test.mut);
150*01826a49SYabin Cui
151*01826a49SYabin Cui return 0;
152*01826a49SYabin Cui }
153*01826a49SYabin Cui
testThreadReduction(void)154*01826a49SYabin Cui static int testThreadReduction(void) {
155*01826a49SYabin Cui int result;
156*01826a49SYabin Cui poolTest_t test;
157*01826a49SYabin Cui POOL_ctx* const ctx = POOL_create(4 /*nbThreads*/, 2 /*queueSize*/);
158*01826a49SYabin Cui
159*01826a49SYabin Cui ASSERT_TRUE(ctx);
160*01826a49SYabin Cui
161*01826a49SYabin Cui memset(&test, 0, sizeof(test));
162*01826a49SYabin Cui ASSERT_FALSE( ZSTD_pthread_mutex_init(&test.mut, NULL) );
163*01826a49SYabin Cui ASSERT_FALSE( ZSTD_pthread_cond_init(&test.cond, NULL) );
164*01826a49SYabin Cui
165*01826a49SYabin Cui result = testThreadReduction_internal(ctx, test);
166*01826a49SYabin Cui
167*01826a49SYabin Cui ZSTD_pthread_mutex_destroy(&test.mut);
168*01826a49SYabin Cui ZSTD_pthread_cond_destroy(&test.cond);
169*01826a49SYabin Cui POOL_free(ctx);
170*01826a49SYabin Cui
171*01826a49SYabin Cui return result;
172*01826a49SYabin Cui }
173*01826a49SYabin Cui
174*01826a49SYabin Cui
175*01826a49SYabin Cui /* --- test abrupt ending --- */
176*01826a49SYabin Cui
177*01826a49SYabin Cui typedef struct {
178*01826a49SYabin Cui ZSTD_pthread_mutex_t mut;
179*01826a49SYabin Cui int val;
180*01826a49SYabin Cui } abruptEndCanary_t;
181*01826a49SYabin Cui
waitIncFn(void * opaque)182*01826a49SYabin Cui static void waitIncFn(void *opaque) {
183*01826a49SYabin Cui abruptEndCanary_t* test = (abruptEndCanary_t*) opaque;
184*01826a49SYabin Cui UTIL_sleepMilli(10);
185*01826a49SYabin Cui ZSTD_pthread_mutex_lock(&test->mut);
186*01826a49SYabin Cui test->val = test->val + 1;
187*01826a49SYabin Cui ZSTD_pthread_mutex_unlock(&test->mut);
188*01826a49SYabin Cui }
189*01826a49SYabin Cui
testAbruptEnding_internal(abruptEndCanary_t test)190*01826a49SYabin Cui static int testAbruptEnding_internal(abruptEndCanary_t test)
191*01826a49SYabin Cui {
192*01826a49SYabin Cui int const nbWaits = 16;
193*01826a49SYabin Cui
194*01826a49SYabin Cui POOL_ctx* const ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/);
195*01826a49SYabin Cui ASSERT_TRUE(ctx);
196*01826a49SYabin Cui test.val = 0;
197*01826a49SYabin Cui
198*01826a49SYabin Cui { int i;
199*01826a49SYabin Cui for (i=0; i<nbWaits; i++)
200*01826a49SYabin Cui POOL_add(ctx, &waitIncFn, &test); /* all jobs pushed into queue */
201*01826a49SYabin Cui }
202*01826a49SYabin Cui ASSERT_EQ( POOL_resize(ctx, 1 /*numThreads*/) , 0 ); /* downsize numThreads, to try to break end condition */
203*01826a49SYabin Cui
204*01826a49SYabin Cui POOL_free(ctx); /* must finish all jobs in queue before giving back control */
205*01826a49SYabin Cui ASSERT_EQ(test.val, nbWaits);
206*01826a49SYabin Cui return 0;
207*01826a49SYabin Cui }
208*01826a49SYabin Cui
testAbruptEnding(void)209*01826a49SYabin Cui static int testAbruptEnding(void) {
210*01826a49SYabin Cui int result;
211*01826a49SYabin Cui abruptEndCanary_t test;
212*01826a49SYabin Cui
213*01826a49SYabin Cui memset(&test, 0, sizeof(test));
214*01826a49SYabin Cui ASSERT_FALSE( ZSTD_pthread_mutex_init(&test.mut, NULL) );
215*01826a49SYabin Cui
216*01826a49SYabin Cui result = testAbruptEnding_internal(test);
217*01826a49SYabin Cui
218*01826a49SYabin Cui ZSTD_pthread_mutex_destroy(&test.mut);
219*01826a49SYabin Cui return result;
220*01826a49SYabin Cui }
221*01826a49SYabin Cui
222*01826a49SYabin Cui
223*01826a49SYabin Cui
224*01826a49SYabin Cui /* --- test launcher --- */
225*01826a49SYabin Cui
main(int argc,const char ** argv)226*01826a49SYabin Cui int main(int argc, const char **argv) {
227*01826a49SYabin Cui size_t numThreads;
228*01826a49SYabin Cui (void)argc;
229*01826a49SYabin Cui (void)argv;
230*01826a49SYabin Cui
231*01826a49SYabin Cui if (POOL_create(0, 1)) { /* should not be possible */
232*01826a49SYabin Cui printf("FAIL: should not create POOL with 0 threads\n");
233*01826a49SYabin Cui return 1;
234*01826a49SYabin Cui }
235*01826a49SYabin Cui
236*01826a49SYabin Cui for (numThreads = 1; numThreads <= 4; ++numThreads) {
237*01826a49SYabin Cui size_t queueSize;
238*01826a49SYabin Cui for (queueSize = 0; queueSize <= 2; ++queueSize) {
239*01826a49SYabin Cui printf("queueSize==%u, numThreads=%u \n",
240*01826a49SYabin Cui (unsigned)queueSize, (unsigned)numThreads);
241*01826a49SYabin Cui if (testOrder(numThreads, queueSize)) {
242*01826a49SYabin Cui printf("FAIL: testOrder\n");
243*01826a49SYabin Cui return 1;
244*01826a49SYabin Cui }
245*01826a49SYabin Cui printf("SUCCESS: testOrder\n");
246*01826a49SYabin Cui if (testWait(numThreads, queueSize)) {
247*01826a49SYabin Cui printf("FAIL: testWait\n");
248*01826a49SYabin Cui return 1;
249*01826a49SYabin Cui }
250*01826a49SYabin Cui printf("SUCCESS: testWait\n");
251*01826a49SYabin Cui }
252*01826a49SYabin Cui }
253*01826a49SYabin Cui
254*01826a49SYabin Cui if (testThreadReduction()) {
255*01826a49SYabin Cui printf("FAIL: thread reduction not effective \n");
256*01826a49SYabin Cui return 1;
257*01826a49SYabin Cui } else {
258*01826a49SYabin Cui printf("SUCCESS: thread reduction effective \n");
259*01826a49SYabin Cui }
260*01826a49SYabin Cui
261*01826a49SYabin Cui if (testAbruptEnding()) {
262*01826a49SYabin Cui printf("FAIL: jobs in queue not completed on early end \n");
263*01826a49SYabin Cui return 1;
264*01826a49SYabin Cui } else {
265*01826a49SYabin Cui printf("SUCCESS: all jobs in queue completed on early end \n");
266*01826a49SYabin Cui }
267*01826a49SYabin Cui
268*01826a49SYabin Cui printf("PASS: all POOL tests\n");
269*01826a49SYabin Cui
270*01826a49SYabin Cui return 0;
271*01826a49SYabin Cui }
272