1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "apr_file_io.h"
18 #include "apr_thread_proc.h"
19 #include "apr_thread_mutex.h"
20 #include "apr_thread_cond.h"
21 #include "apr_errno.h"
22 #include "apr_general.h"
23 #include "apr_atomic.h"
24 #include "testutil.h"
25
26 #define NTHREADS 10
27
28 #define ABTS_SUCCESS(rv) ABTS_INT_EQUAL(tc, APR_SUCCESS, rv)
29
30 #if APR_HAS_THREADS
31
32 typedef struct toolbox_t toolbox_t;
33
34 struct toolbox_t {
35 void *data;
36 abts_case *tc;
37 apr_thread_mutex_t *mutex;
38 apr_thread_cond_t *cond;
39 void (*func)(toolbox_t *box);
40 };
41
42 typedef struct toolbox_fnptr_t toolbox_fnptr_t;
43
44 struct toolbox_fnptr_t {
45 void (*func)(toolbox_t *box);
46 };
47
lost_signal(abts_case * tc,void * data)48 static void lost_signal(abts_case *tc, void *data)
49 {
50 apr_status_t rv;
51 apr_thread_cond_t *cond = NULL;
52 apr_thread_mutex_t *mutex = NULL;
53
54 rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p);
55 ABTS_SUCCESS(rv);
56 ABTS_PTR_NOTNULL(tc, mutex);
57
58 rv = apr_thread_cond_create(&cond, p);
59 ABTS_SUCCESS(rv);
60 ABTS_PTR_NOTNULL(tc, cond);
61
62 rv = apr_thread_cond_signal(cond);
63 ABTS_SUCCESS(rv);
64
65 rv = apr_thread_mutex_lock(mutex);
66 ABTS_SUCCESS(rv);
67
68 rv = apr_thread_cond_timedwait(cond, mutex, 10000);
69 ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_TIMEUP(rv));
70
71 rv = apr_thread_mutex_unlock(mutex);
72 ABTS_SUCCESS(rv);
73
74 rv = apr_thread_cond_broadcast(cond);
75 ABTS_SUCCESS(rv);
76
77 rv = apr_thread_mutex_lock(mutex);
78 ABTS_SUCCESS(rv);
79
80 rv = apr_thread_cond_timedwait(cond, mutex, 10000);
81 ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_TIMEUP(rv));
82
83 rv = apr_thread_mutex_unlock(mutex);
84 ABTS_SUCCESS(rv);
85
86 rv = apr_thread_cond_destroy(cond);
87 ABTS_SUCCESS(rv);
88
89 rv = apr_thread_mutex_destroy(mutex);
90 ABTS_SUCCESS(rv);
91 }
92
thread_routine(apr_thread_t * thd,void * data)93 static void *APR_THREAD_FUNC thread_routine(apr_thread_t *thd, void *data)
94 {
95 toolbox_t *box = data;
96
97 box->func(box);
98
99 apr_thread_exit(thd, 0);
100
101 return NULL;
102 }
103
lock_and_signal(toolbox_t * box)104 static void lock_and_signal(toolbox_t *box)
105 {
106 apr_status_t rv;
107 abts_case *tc = box->tc;
108
109 rv = apr_thread_mutex_lock(box->mutex);
110 ABTS_SUCCESS(rv);
111
112 rv = apr_thread_cond_signal(box->cond);
113 ABTS_SUCCESS(rv);
114
115 rv = apr_thread_mutex_unlock(box->mutex);
116 ABTS_SUCCESS(rv);
117 }
118
dynamic_binding(abts_case * tc,void * data)119 static void dynamic_binding(abts_case *tc, void *data)
120 {
121 unsigned int i;
122 apr_status_t rv;
123 toolbox_t box[NTHREADS];
124 apr_thread_t *thread[NTHREADS];
125 apr_thread_mutex_t *mutex[NTHREADS];
126 apr_thread_cond_t *cond = NULL;
127
128 rv = apr_thread_cond_create(&cond, p);
129 ABTS_SUCCESS(rv);
130 ABTS_PTR_NOTNULL(tc, cond);
131
132 for (i = 0; i < NTHREADS; i++) {
133 rv = apr_thread_mutex_create(&mutex[i], APR_THREAD_MUTEX_DEFAULT, p);
134 ABTS_SUCCESS(rv);
135
136 rv = apr_thread_mutex_lock(mutex[i]);
137 ABTS_SUCCESS(rv);
138
139 box[i].tc = tc;
140 box[i].cond = cond;
141 box[i].mutex = mutex[i];
142 box[i].func = lock_and_signal;
143
144 rv = apr_thread_create(&thread[i], NULL, thread_routine, &box[i], p);
145 ABTS_SUCCESS(rv);
146 }
147
148 /*
149 * The dynamic binding should be preserved because we use only one waiter
150 */
151
152 for (i = 0; i < NTHREADS; i++) {
153 rv = apr_thread_cond_wait(cond, mutex[i]);
154 ABTS_SUCCESS(rv);
155 }
156
157 for (i = 0; i < NTHREADS; i++) {
158 rv = apr_thread_cond_timedwait(cond, mutex[i], 10000);
159 ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_TIMEUP(rv));
160
161 rv = apr_thread_mutex_unlock(mutex[i]);
162 ABTS_SUCCESS(rv);
163 }
164
165 for (i = 0; i < NTHREADS; i++) {
166 apr_status_t retval;
167 rv = apr_thread_join(&retval, thread[i]);
168 ABTS_SUCCESS(rv);
169 }
170
171 rv = apr_thread_cond_destroy(cond);
172 ABTS_SUCCESS(rv);
173
174 for (i = 0; i < NTHREADS; i++) {
175 rv = apr_thread_mutex_destroy(mutex[i]);
176 ABTS_SUCCESS(rv);
177 }
178 }
179
lock_and_wait(toolbox_t * box)180 static void lock_and_wait(toolbox_t *box)
181 {
182 apr_status_t rv;
183 abts_case *tc = box->tc;
184 apr_uint32_t *count = box->data;
185
186 rv = apr_thread_mutex_lock(box->mutex);
187 ABTS_SUCCESS(rv);
188
189 apr_atomic_inc32(count);
190
191 rv = apr_thread_cond_wait(box->cond, box->mutex);
192 ABTS_SUCCESS(rv);
193
194 apr_atomic_dec32(count);
195
196 rv = apr_thread_mutex_unlock(box->mutex);
197 ABTS_SUCCESS(rv);
198 }
199
broadcast_threads(abts_case * tc,void * data)200 static void broadcast_threads(abts_case *tc, void *data)
201 {
202 toolbox_t box;
203 unsigned int i;
204 apr_status_t rv;
205 apr_uint32_t count = 0;
206 apr_thread_cond_t *cond = NULL;
207 apr_thread_mutex_t *mutex = NULL;
208 apr_thread_t *thread[NTHREADS];
209
210 rv = apr_thread_cond_create(&cond, p);
211 ABTS_SUCCESS(rv);
212 ABTS_PTR_NOTNULL(tc, cond);
213
214 rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p);
215 ABTS_SUCCESS(rv);
216 ABTS_PTR_NOTNULL(tc, mutex);
217
218 rv = apr_thread_mutex_lock(mutex);
219 ABTS_SUCCESS(rv);
220
221 box.tc = tc;
222 box.data = &count;
223 box.mutex = mutex;
224 box.cond = cond;
225 box.func = lock_and_wait;
226
227 for (i = 0; i < NTHREADS; i++) {
228 rv = apr_thread_create(&thread[i], NULL, thread_routine, &box, p);
229 ABTS_SUCCESS(rv);
230 }
231
232 do {
233 rv = apr_thread_mutex_unlock(mutex);
234 ABTS_SUCCESS(rv);
235 apr_sleep(100000);
236 rv = apr_thread_mutex_lock(mutex);
237 ABTS_SUCCESS(rv);
238 } while (apr_atomic_read32(&count) != NTHREADS);
239
240 rv = apr_thread_cond_broadcast(cond);
241 ABTS_SUCCESS(rv);
242
243 rv = apr_thread_mutex_unlock(mutex);
244 ABTS_SUCCESS(rv);
245
246 for (i = 0; i < NTHREADS; i++) {
247 apr_status_t retval;
248 rv = apr_thread_join(&retval, thread[i]);
249 ABTS_SUCCESS(rv);
250 }
251
252 ABTS_INT_EQUAL(tc, 0, count);
253
254 rv = apr_thread_cond_destroy(cond);
255 ABTS_SUCCESS(rv);
256
257 rv = apr_thread_mutex_destroy(mutex);
258 ABTS_SUCCESS(rv);
259 }
260
nested_lock_and_wait(toolbox_t * box)261 static void nested_lock_and_wait(toolbox_t *box)
262 {
263 apr_status_t rv;
264 abts_case *tc = box->tc;
265
266 rv = apr_thread_mutex_lock(box->mutex);
267 ABTS_SUCCESS(rv);
268
269 rv = apr_thread_mutex_lock(box->mutex);
270 ABTS_SUCCESS(rv);
271
272 rv = apr_thread_mutex_lock(box->mutex);
273 ABTS_SUCCESS(rv);
274
275 rv = apr_thread_cond_wait(box->cond, box->mutex);
276 ABTS_SUCCESS(rv);
277 }
278
nested_lock_and_unlock(toolbox_t * box)279 static void nested_lock_and_unlock(toolbox_t *box)
280 {
281 apr_status_t rv;
282 abts_case *tc = box->tc;
283
284 rv = apr_thread_mutex_lock(box->mutex);
285 ABTS_SUCCESS(rv);
286
287 rv = apr_thread_mutex_lock(box->mutex);
288 ABTS_SUCCESS(rv);
289
290 rv = apr_thread_mutex_lock(box->mutex);
291 ABTS_SUCCESS(rv);
292
293 rv = apr_thread_cond_timedwait(box->cond, box->mutex, 2000000);
294 ABTS_SUCCESS(rv);
295
296 rv = apr_thread_mutex_unlock(box->mutex);
297 ABTS_SUCCESS(rv);
298
299 rv = apr_thread_mutex_unlock(box->mutex);
300 ABTS_SUCCESS(rv);
301 }
302
nested_wait(abts_case * tc,void * data)303 static void nested_wait(abts_case *tc, void *data)
304 {
305 toolbox_fnptr_t *fnptr = data;
306 toolbox_t box;
307 apr_status_t rv, retval;
308 apr_thread_cond_t *cond = NULL;
309 apr_thread_t *thread = NULL;
310 apr_thread_mutex_t *mutex = NULL;
311
312 rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, p);
313 ABTS_SUCCESS(rv);
314 ABTS_PTR_NOTNULL(tc, mutex);
315
316 rv = apr_thread_cond_create(&cond, p);
317 ABTS_SUCCESS(rv);
318 ABTS_PTR_NOTNULL(tc, cond);
319
320 rv = apr_thread_mutex_lock(mutex);
321 ABTS_SUCCESS(rv);
322
323 box.tc = tc;
324 box.cond = cond;
325 box.mutex = mutex;
326 box.func = fnptr->func;
327
328 rv = apr_thread_create(&thread, NULL, thread_routine, &box, p);
329 ABTS_SUCCESS(rv);
330
331 rv = apr_thread_mutex_unlock(mutex);
332 ABTS_SUCCESS(rv);
333
334 /* yield the processor */
335 apr_sleep(500000);
336
337 rv = apr_thread_cond_signal(cond);
338 ABTS_SUCCESS(rv);
339
340 rv = apr_thread_join(&retval, thread);
341 ABTS_SUCCESS(rv);
342
343 rv = apr_thread_mutex_trylock(mutex);
344 ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_EBUSY(rv));
345
346 rv = apr_thread_mutex_trylock(mutex);
347 ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_EBUSY(rv));
348 }
349
350 static volatile apr_uint64_t pipe_count;
351 static volatile apr_uint32_t exiting;
352
pipe_consumer(toolbox_t * box)353 static void pipe_consumer(toolbox_t *box)
354 {
355 char ch;
356 apr_status_t rv;
357 apr_size_t nbytes;
358 abts_case *tc = box->tc;
359 apr_file_t *out = box->data;
360 apr_uint32_t consumed = 0;
361
362 do {
363 rv = apr_thread_mutex_lock(box->mutex);
364 ABTS_SUCCESS(rv);
365
366 while (!pipe_count && !exiting) {
367 rv = apr_thread_cond_wait(box->cond, box->mutex);
368 ABTS_SUCCESS(rv);
369 }
370
371 if (!pipe_count && exiting) {
372 rv = apr_thread_mutex_unlock(box->mutex);
373 ABTS_SUCCESS(rv);
374 break;
375 }
376
377 pipe_count--;
378 consumed++;
379
380 rv = apr_thread_mutex_unlock(box->mutex);
381 ABTS_SUCCESS(rv);
382
383 rv = apr_file_read_full(out, &ch, 1, &nbytes);
384 ABTS_SUCCESS(rv);
385 ABTS_SIZE_EQUAL(tc, 1, nbytes);
386 ABTS_TRUE(tc, ch == '.');
387 } while (1);
388
389 /* naive fairness test - it would be good to introduce or solidify
390 * a solid test to ensure one thread is not starved.
391 * ABTS_INT_EQUAL(tc, 1, !!consumed);
392 */
393 }
394
pipe_write(toolbox_t * box,char ch)395 static void pipe_write(toolbox_t *box, char ch)
396 {
397 apr_status_t rv;
398 apr_size_t nbytes;
399 abts_case *tc = box->tc;
400 apr_file_t *in = box->data;
401
402 rv = apr_file_write_full(in, &ch, 1, &nbytes);
403 ABTS_SUCCESS(rv);
404 ABTS_SIZE_EQUAL(tc, 1, nbytes);
405
406 rv = apr_thread_mutex_lock(box->mutex);
407 ABTS_SUCCESS(rv);
408
409 if (!pipe_count) {
410 rv = apr_thread_cond_signal(box->cond);
411 ABTS_SUCCESS(rv);
412 }
413
414 pipe_count++;
415
416 rv = apr_thread_mutex_unlock(box->mutex);
417 ABTS_SUCCESS(rv);
418 }
419
pipe_producer(toolbox_t * box)420 static void pipe_producer(toolbox_t *box)
421 {
422 apr_uint32_t loop = 500;
423
424 do {
425 pipe_write(box, '.');
426 } while (loop--);
427 }
428
pipe_producer_consumer(abts_case * tc,void * data)429 static void pipe_producer_consumer(abts_case *tc, void *data)
430 {
431 apr_status_t rv;
432 toolbox_t boxcons, boxprod;
433 apr_thread_t *thread[NTHREADS];
434 apr_thread_cond_t *cond = NULL;
435 apr_thread_mutex_t *mutex = NULL;
436 apr_file_t *in = NULL, *out = NULL;
437 apr_uint32_t i, ncons = (apr_uint32_t)(NTHREADS * 0.70);
438
439 rv = apr_file_pipe_create(&in, &out, p);
440 ABTS_SUCCESS(rv);
441
442 rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p);
443 ABTS_SUCCESS(rv);
444 ABTS_PTR_NOTNULL(tc, mutex);
445
446 rv = apr_thread_cond_create(&cond, p);
447 ABTS_SUCCESS(rv);
448 ABTS_PTR_NOTNULL(tc, cond);
449
450 boxcons.tc = tc;
451 boxcons.data = in;
452 boxcons.mutex = mutex;
453 boxcons.cond = cond;
454 boxcons.func = pipe_consumer;
455
456 for (i = 0; i < ncons; i++) {
457 rv = apr_thread_create(&thread[i], NULL, thread_routine, &boxcons, p);
458 ABTS_SUCCESS(rv);
459 }
460
461 boxprod.tc = tc;
462 boxprod.data = out;
463 boxprod.mutex = mutex;
464 boxprod.cond = cond;
465 boxprod.func = pipe_producer;
466
467 for (; i < NTHREADS; i++) {
468 rv = apr_thread_create(&thread[i], NULL, thread_routine, &boxprod, p);
469 ABTS_SUCCESS(rv);
470 }
471
472 for (i = ncons; i < NTHREADS; i++) {
473 apr_status_t retval;
474 rv = apr_thread_join(&retval, thread[i]);
475 ABTS_SUCCESS(rv);
476 }
477
478 rv = apr_thread_mutex_lock(mutex);
479 ABTS_SUCCESS(rv);
480
481 exiting = 1;
482
483 rv = apr_thread_cond_broadcast(cond);
484 ABTS_SUCCESS(rv);
485
486 rv = apr_thread_mutex_unlock(mutex);
487 ABTS_SUCCESS(rv);
488
489 for (i = 0; i < ncons; i++) {
490 apr_status_t retval;
491 rv = apr_thread_join(&retval, thread[i]);
492 ABTS_SUCCESS(rv);
493 }
494
495 rv = apr_thread_cond_destroy(cond);
496 ABTS_SUCCESS(rv);
497
498 rv = apr_thread_mutex_destroy(mutex);
499 ABTS_SUCCESS(rv);
500
501 rv = apr_file_close(in);
502 ABTS_SUCCESS(rv);
503
504 rv = apr_file_close(out);
505 ABTS_SUCCESS(rv);
506 }
507
508 volatile enum {
509 TOSS,
510 PING,
511 PONG,
512 OVER
513 } state;
514
ping(toolbox_t * box)515 static void ping(toolbox_t *box)
516 {
517 apr_status_t rv;
518 abts_case *tc = box->tc;
519
520 rv = apr_thread_mutex_lock(box->mutex);
521 ABTS_SUCCESS(rv);
522
523 if (state == TOSS)
524 state = PING;
525
526 do {
527 rv = apr_thread_cond_signal(box->cond);
528 ABTS_SUCCESS(rv);
529
530 state = PONG;
531
532 rv = apr_thread_cond_wait(box->cond, box->mutex);
533 ABTS_SUCCESS(rv);
534
535 ABTS_TRUE(tc, state == PING || state == OVER);
536 } while (state != OVER);
537
538 rv = apr_thread_mutex_unlock(box->mutex);
539 ABTS_SUCCESS(rv);
540
541 rv = apr_thread_cond_broadcast(box->cond);
542 ABTS_SUCCESS(rv);
543 }
544
pong(toolbox_t * box)545 static void pong(toolbox_t *box)
546 {
547 apr_status_t rv;
548 abts_case *tc = box->tc;
549
550 rv = apr_thread_mutex_lock(box->mutex);
551 ABTS_SUCCESS(rv);
552
553 if (state == TOSS)
554 state = PONG;
555
556 do {
557 rv = apr_thread_cond_signal(box->cond);
558 ABTS_SUCCESS(rv);
559
560 state = PING;
561
562 rv = apr_thread_cond_wait(box->cond, box->mutex);
563 ABTS_SUCCESS(rv);
564
565 ABTS_TRUE(tc, state == PONG || state == OVER);
566 } while (state != OVER);
567
568 rv = apr_thread_mutex_unlock(box->mutex);
569 ABTS_SUCCESS(rv);
570
571 rv = apr_thread_cond_broadcast(box->cond);
572 ABTS_SUCCESS(rv);
573 }
574
ping_pong(abts_case * tc,void * data)575 static void ping_pong(abts_case *tc, void *data)
576 {
577 apr_status_t rv, retval;
578 toolbox_t box_ping, box_pong;
579 apr_thread_cond_t *cond = NULL;
580 apr_thread_mutex_t *mutex = NULL;
581 apr_thread_t *thr_ping = NULL, *thr_pong = NULL;
582
583 rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p);
584 ABTS_SUCCESS(rv);
585 ABTS_PTR_NOTNULL(tc, mutex);
586
587 rv = apr_thread_cond_create(&cond, p);
588 ABTS_SUCCESS(rv);
589 ABTS_PTR_NOTNULL(tc, cond);
590
591 rv = apr_thread_mutex_lock(mutex);
592 ABTS_SUCCESS(rv);
593
594 box_ping.tc = tc;
595 box_ping.data = NULL;
596 box_ping.mutex = mutex;
597 box_ping.cond = cond;
598 box_ping.func = ping;
599
600 rv = apr_thread_create(&thr_ping, NULL, thread_routine, &box_ping, p);
601 ABTS_SUCCESS(rv);
602
603 box_pong.tc = tc;
604 box_pong.data = NULL;
605 box_pong.mutex = mutex;
606 box_pong.cond = cond;
607 box_pong.func = pong;
608
609 rv = apr_thread_create(&thr_pong, NULL, thread_routine, &box_pong, p);
610 ABTS_SUCCESS(rv);
611
612 state = TOSS;
613
614 rv = apr_thread_mutex_unlock(mutex);
615 ABTS_SUCCESS(rv);
616
617 apr_sleep(3000000);
618
619 rv = apr_thread_mutex_lock(mutex);
620 ABTS_SUCCESS(rv);
621
622 state = OVER;
623
624 rv = apr_thread_mutex_unlock(mutex);
625 ABTS_SUCCESS(rv);
626
627 rv = apr_thread_join(&retval, thr_ping);
628 ABTS_SUCCESS(rv);
629
630 rv = apr_thread_join(&retval, thr_pong);
631 ABTS_SUCCESS(rv);
632
633 rv = apr_thread_cond_destroy(cond);
634 ABTS_SUCCESS(rv);
635
636 rv = apr_thread_mutex_destroy(mutex);
637 ABTS_SUCCESS(rv);
638 }
639 #endif /* !APR_HAS_THREADS */
640
641 #if !APR_HAS_THREADS
threads_not_impl(abts_case * tc,void * data)642 static void threads_not_impl(abts_case *tc, void *data)
643 {
644 ABTS_NOT_IMPL(tc, "Threads not implemented on this platform");
645 }
646 #endif
647
testcond(abts_suite * suite)648 abts_suite *testcond(abts_suite *suite)
649 {
650 #if APR_HAS_THREADS
651 toolbox_fnptr_t fnptr;
652 #endif
653 suite = ADD_SUITE(suite)
654
655 #if !APR_HAS_THREADS
656 abts_run_test(suite, threads_not_impl, NULL);
657 #else
658 abts_run_test(suite, lost_signal, NULL);
659 abts_run_test(suite, dynamic_binding, NULL);
660 abts_run_test(suite, broadcast_threads, NULL);
661 fnptr.func = nested_lock_and_wait;
662 abts_run_test(suite, nested_wait, &fnptr);
663 fnptr.func = nested_lock_and_unlock;
664 abts_run_test(suite, nested_wait, &fnptr);
665 abts_run_test(suite, pipe_producer_consumer, NULL);
666 abts_run_test(suite, ping_pong, NULL);
667 #endif
668
669 return suite;
670 }
671