1*b095b053SXin Li /* Standard C headers */
2*b095b053SXin Li #include <assert.h>
3*b095b053SXin Li #include <stdbool.h>
4*b095b053SXin Li #include <stdint.h>
5*b095b053SXin Li #include <stdlib.h>
6*b095b053SXin Li #include <string.h>
7*b095b053SXin Li
8*b095b053SXin Li /* Configuration header */
9*b095b053SXin Li #include "threadpool-common.h"
10*b095b053SXin Li
11*b095b053SXin Li /* Mach headers */
12*b095b053SXin Li #include <dispatch/dispatch.h>
13*b095b053SXin Li #include <sys/types.h>
14*b095b053SXin Li #include <sys/sysctl.h>
15*b095b053SXin Li
16*b095b053SXin Li /* Public library header */
17*b095b053SXin Li #include <pthreadpool.h>
18*b095b053SXin Li
19*b095b053SXin Li /* Internal library headers */
20*b095b053SXin Li #include "threadpool-atomics.h"
21*b095b053SXin Li #include "threadpool-object.h"
22*b095b053SXin Li #include "threadpool-utils.h"
23*b095b053SXin Li
thread_main(void * arg,size_t thread_index)24*b095b053SXin Li static void thread_main(void* arg, size_t thread_index) {
25*b095b053SXin Li struct pthreadpool* threadpool = (struct pthreadpool*) arg;
26*b095b053SXin Li struct thread_info* thread = &threadpool->threads[thread_index];
27*b095b053SXin Li
28*b095b053SXin Li const uint32_t flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
29*b095b053SXin Li const thread_function_t thread_function =
30*b095b053SXin Li (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
31*b095b053SXin Li
32*b095b053SXin Li struct fpu_state saved_fpu_state = { 0 };
33*b095b053SXin Li if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
34*b095b053SXin Li saved_fpu_state = get_fpu_state();
35*b095b053SXin Li disable_fpu_denormals();
36*b095b053SXin Li }
37*b095b053SXin Li
38*b095b053SXin Li thread_function(threadpool, thread);
39*b095b053SXin Li
40*b095b053SXin Li if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
41*b095b053SXin Li set_fpu_state(saved_fpu_state);
42*b095b053SXin Li }
43*b095b053SXin Li }
44*b095b053SXin Li
pthreadpool_create(size_t threads_count)45*b095b053SXin Li struct pthreadpool* pthreadpool_create(size_t threads_count) {
46*b095b053SXin Li if (threads_count == 0) {
47*b095b053SXin Li int threads = 1;
48*b095b053SXin Li size_t sizeof_threads = sizeof(threads);
49*b095b053SXin Li if (sysctlbyname("hw.logicalcpu_max", &threads, &sizeof_threads, NULL, 0) != 0) {
50*b095b053SXin Li return NULL;
51*b095b053SXin Li }
52*b095b053SXin Li
53*b095b053SXin Li if (threads <= 0) {
54*b095b053SXin Li return NULL;
55*b095b053SXin Li }
56*b095b053SXin Li
57*b095b053SXin Li threads_count = (size_t) threads;
58*b095b053SXin Li }
59*b095b053SXin Li
60*b095b053SXin Li struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
61*b095b053SXin Li if (threadpool == NULL) {
62*b095b053SXin Li return NULL;
63*b095b053SXin Li }
64*b095b053SXin Li threadpool->threads_count = fxdiv_init_size_t(threads_count);
65*b095b053SXin Li for (size_t tid = 0; tid < threads_count; tid++) {
66*b095b053SXin Li threadpool->threads[tid].thread_number = tid;
67*b095b053SXin Li }
68*b095b053SXin Li
69*b095b053SXin Li /* Thread pool with a single thread computes everything on the caller thread. */
70*b095b053SXin Li if (threads_count > 1) {
71*b095b053SXin Li threadpool->execution_semaphore = dispatch_semaphore_create(1);
72*b095b053SXin Li }
73*b095b053SXin Li return threadpool;
74*b095b053SXin Li }
75*b095b053SXin Li
pthreadpool_parallelize(struct pthreadpool * threadpool,thread_function_t thread_function,const void * params,size_t params_size,void * task,void * context,size_t linear_range,uint32_t flags)76*b095b053SXin Li PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
77*b095b053SXin Li struct pthreadpool* threadpool,
78*b095b053SXin Li thread_function_t thread_function,
79*b095b053SXin Li const void* params,
80*b095b053SXin Li size_t params_size,
81*b095b053SXin Li void* task,
82*b095b053SXin Li void* context,
83*b095b053SXin Li size_t linear_range,
84*b095b053SXin Li uint32_t flags)
85*b095b053SXin Li {
86*b095b053SXin Li assert(threadpool != NULL);
87*b095b053SXin Li assert(thread_function != NULL);
88*b095b053SXin Li assert(task != NULL);
89*b095b053SXin Li assert(linear_range > 1);
90*b095b053SXin Li
91*b095b053SXin Li /* Protect the global threadpool structures */
92*b095b053SXin Li dispatch_semaphore_wait(threadpool->execution_semaphore, DISPATCH_TIME_FOREVER);
93*b095b053SXin Li
94*b095b053SXin Li /* Setup global arguments */
95*b095b053SXin Li pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
96*b095b053SXin Li pthreadpool_store_relaxed_void_p(&threadpool->task, task);
97*b095b053SXin Li pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
98*b095b053SXin Li pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
99*b095b053SXin Li
100*b095b053SXin Li /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
101*b095b053SXin Li const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
102*b095b053SXin Li
103*b095b053SXin Li if (params_size != 0) {
104*b095b053SXin Li memcpy(&threadpool->params, params, params_size);
105*b095b053SXin Li }
106*b095b053SXin Li
107*b095b053SXin Li /* Spread the work between threads */
108*b095b053SXin Li const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
109*b095b053SXin Li size_t range_start = 0;
110*b095b053SXin Li for (size_t tid = 0; tid < threads_count.value; tid++) {
111*b095b053SXin Li struct thread_info* thread = &threadpool->threads[tid];
112*b095b053SXin Li const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
113*b095b053SXin Li const size_t range_end = range_start + range_length;
114*b095b053SXin Li pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
115*b095b053SXin Li pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
116*b095b053SXin Li pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
117*b095b053SXin Li
118*b095b053SXin Li /* The next subrange starts where the previous ended */
119*b095b053SXin Li range_start = range_end;
120*b095b053SXin Li }
121*b095b053SXin Li
122*b095b053SXin Li dispatch_apply_f(threads_count.value, DISPATCH_APPLY_AUTO, threadpool, thread_main);
123*b095b053SXin Li
124*b095b053SXin Li /* Unprotect the global threadpool structures */
125*b095b053SXin Li dispatch_semaphore_signal(threadpool->execution_semaphore);
126*b095b053SXin Li }
127*b095b053SXin Li
pthreadpool_destroy(struct pthreadpool * threadpool)128*b095b053SXin Li void pthreadpool_destroy(struct pthreadpool* threadpool) {
129*b095b053SXin Li if (threadpool != NULL) {
130*b095b053SXin Li if (threadpool->execution_semaphore != NULL) {
131*b095b053SXin Li /* Release resources */
132*b095b053SXin Li dispatch_release(threadpool->execution_semaphore);
133*b095b053SXin Li }
134*b095b053SXin Li pthreadpool_deallocate(threadpool);
135*b095b053SXin Li }
136*b095b053SXin Li }
137