1 /* Standard C headers */
2 #include <assert.h>
3 #include <stdbool.h>
4 #include <stdint.h>
5 #include <stdlib.h>
6 #include <string.h>
7
8 /* Configuration header */
9 #include "threadpool-common.h"
10
11 /* Mach headers */
12 #include <dispatch/dispatch.h>
13 #include <sys/types.h>
14 #include <sys/sysctl.h>
15
16 /* Public library header */
17 #include <pthreadpool.h>
18
19 /* Internal library headers */
20 #include "threadpool-atomics.h"
21 #include "threadpool-object.h"
22 #include "threadpool-utils.h"
23
thread_main(void * arg,size_t thread_index)24 static void thread_main(void* arg, size_t thread_index) {
25 struct pthreadpool* threadpool = (struct pthreadpool*) arg;
26 struct thread_info* thread = &threadpool->threads[thread_index];
27
28 const uint32_t flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
29 const thread_function_t thread_function =
30 (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
31
32 struct fpu_state saved_fpu_state = { 0 };
33 if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
34 saved_fpu_state = get_fpu_state();
35 disable_fpu_denormals();
36 }
37
38 thread_function(threadpool, thread);
39
40 if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
41 set_fpu_state(saved_fpu_state);
42 }
43 }
44
pthreadpool_create(size_t threads_count)45 struct pthreadpool* pthreadpool_create(size_t threads_count) {
46 if (threads_count == 0) {
47 int threads = 1;
48 size_t sizeof_threads = sizeof(threads);
49 if (sysctlbyname("hw.logicalcpu_max", &threads, &sizeof_threads, NULL, 0) != 0) {
50 return NULL;
51 }
52
53 if (threads <= 0) {
54 return NULL;
55 }
56
57 threads_count = (size_t) threads;
58 }
59
60 struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
61 if (threadpool == NULL) {
62 return NULL;
63 }
64 threadpool->threads_count = fxdiv_init_size_t(threads_count);
65 for (size_t tid = 0; tid < threads_count; tid++) {
66 threadpool->threads[tid].thread_number = tid;
67 }
68
69 /* Thread pool with a single thread computes everything on the caller thread. */
70 if (threads_count > 1) {
71 threadpool->execution_semaphore = dispatch_semaphore_create(1);
72 }
73 return threadpool;
74 }
75
pthreadpool_parallelize(struct pthreadpool * threadpool,thread_function_t thread_function,const void * params,size_t params_size,void * task,void * context,size_t linear_range,uint32_t flags)76 PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
77 struct pthreadpool* threadpool,
78 thread_function_t thread_function,
79 const void* params,
80 size_t params_size,
81 void* task,
82 void* context,
83 size_t linear_range,
84 uint32_t flags)
85 {
86 assert(threadpool != NULL);
87 assert(thread_function != NULL);
88 assert(task != NULL);
89 assert(linear_range > 1);
90
91 /* Protect the global threadpool structures */
92 dispatch_semaphore_wait(threadpool->execution_semaphore, DISPATCH_TIME_FOREVER);
93
94 /* Setup global arguments */
95 pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
96 pthreadpool_store_relaxed_void_p(&threadpool->task, task);
97 pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
98 pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
99
100 /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
101 const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
102
103 if (params_size != 0) {
104 memcpy(&threadpool->params, params, params_size);
105 }
106
107 /* Spread the work between threads */
108 const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
109 size_t range_start = 0;
110 for (size_t tid = 0; tid < threads_count.value; tid++) {
111 struct thread_info* thread = &threadpool->threads[tid];
112 const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
113 const size_t range_end = range_start + range_length;
114 pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
115 pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
116 pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
117
118 /* The next subrange starts where the previous ended */
119 range_start = range_end;
120 }
121
122 dispatch_apply_f(threads_count.value, DISPATCH_APPLY_AUTO, threadpool, thread_main);
123
124 /* Unprotect the global threadpool structures */
125 dispatch_semaphore_signal(threadpool->execution_semaphore);
126 }
127
pthreadpool_destroy(struct pthreadpool * threadpool)128 void pthreadpool_destroy(struct pthreadpool* threadpool) {
129 if (threadpool != NULL) {
130 if (threadpool->execution_semaphore != NULL) {
131 /* Release resources */
132 dispatch_release(threadpool->execution_semaphore);
133 }
134 pthreadpool_deallocate(threadpool);
135 }
136 }
137