1*b095b053SXin Li /* Standard C headers */
2*b095b053SXin Li #include <assert.h>
3*b095b053SXin Li #include <stdbool.h>
4*b095b053SXin Li #include <stdint.h>
5*b095b053SXin Li #include <stdlib.h>
6*b095b053SXin Li #include <string.h>
7*b095b053SXin Li
8*b095b053SXin Li /* Configuration header */
9*b095b053SXin Li #include "threadpool-common.h"
10*b095b053SXin Li
11*b095b053SXin Li /* Windows headers */
12*b095b053SXin Li #include <windows.h>
13*b095b053SXin Li
14*b095b053SXin Li /* Public library header */
15*b095b053SXin Li #include <pthreadpool.h>
16*b095b053SXin Li
17*b095b053SXin Li /* Internal library headers */
18*b095b053SXin Li #include "threadpool-atomics.h"
19*b095b053SXin Li #include "threadpool-object.h"
20*b095b053SXin Li #include "threadpool-utils.h"
21*b095b053SXin Li
22*b095b053SXin Li
checkin_worker_thread(struct pthreadpool * threadpool,uint32_t event_index)23*b095b053SXin Li static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) {
24*b095b053SXin Li if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) {
25*b095b053SXin Li SetEvent(threadpool->completion_event[event_index]);
26*b095b053SXin Li }
27*b095b053SXin Li }
28*b095b053SXin Li
wait_worker_threads(struct pthreadpool * threadpool,uint32_t event_index)29*b095b053SXin Li static void wait_worker_threads(struct pthreadpool* threadpool, uint32_t event_index) {
30*b095b053SXin Li /* Initial check */
31*b095b053SXin Li size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
32*b095b053SXin Li if (active_threads == 0) {
33*b095b053SXin Li return;
34*b095b053SXin Li }
35*b095b053SXin Li
36*b095b053SXin Li /* Spin-wait */
37*b095b053SXin Li for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
38*b095b053SXin Li pthreadpool_yield();
39*b095b053SXin Li
40*b095b053SXin Li active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
41*b095b053SXin Li if (active_threads == 0) {
42*b095b053SXin Li return;
43*b095b053SXin Li }
44*b095b053SXin Li }
45*b095b053SXin Li
46*b095b053SXin Li /* Fall-back to event wait */
47*b095b053SXin Li const DWORD wait_status = WaitForSingleObject(threadpool->completion_event[event_index], INFINITE);
48*b095b053SXin Li assert(wait_status == WAIT_OBJECT_0);
49*b095b053SXin Li assert(pthreadpool_load_relaxed_size_t(&threadpool->active_threads) == 0);
50*b095b053SXin Li }
51*b095b053SXin Li
wait_for_new_command(struct pthreadpool * threadpool,uint32_t last_command,uint32_t last_flags)52*b095b053SXin Li static uint32_t wait_for_new_command(
53*b095b053SXin Li struct pthreadpool* threadpool,
54*b095b053SXin Li uint32_t last_command,
55*b095b053SXin Li uint32_t last_flags)
56*b095b053SXin Li {
57*b095b053SXin Li uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
58*b095b053SXin Li if (command != last_command) {
59*b095b053SXin Li return command;
60*b095b053SXin Li }
61*b095b053SXin Li
62*b095b053SXin Li if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
63*b095b053SXin Li /* Spin-wait loop */
64*b095b053SXin Li for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
65*b095b053SXin Li pthreadpool_yield();
66*b095b053SXin Li
67*b095b053SXin Li command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
68*b095b053SXin Li if (command != last_command) {
69*b095b053SXin Li return command;
70*b095b053SXin Li }
71*b095b053SXin Li }
72*b095b053SXin Li }
73*b095b053SXin Li
74*b095b053SXin Li /* Spin-wait disabled or timed out, fall back to event wait */
75*b095b053SXin Li const uint32_t event_index = (last_command >> 31);
76*b095b053SXin Li const DWORD wait_status = WaitForSingleObject(threadpool->command_event[event_index], INFINITE);
77*b095b053SXin Li assert(wait_status == WAIT_OBJECT_0);
78*b095b053SXin Li
79*b095b053SXin Li command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
80*b095b053SXin Li assert(command != last_command);
81*b095b053SXin Li return command;
82*b095b053SXin Li }
83*b095b053SXin Li
thread_main(LPVOID arg)84*b095b053SXin Li static DWORD WINAPI thread_main(LPVOID arg) {
85*b095b053SXin Li struct thread_info* thread = (struct thread_info*) arg;
86*b095b053SXin Li struct pthreadpool* threadpool = thread->threadpool;
87*b095b053SXin Li uint32_t last_command = threadpool_command_init;
88*b095b053SXin Li struct fpu_state saved_fpu_state = { 0 };
89*b095b053SXin Li uint32_t flags = 0;
90*b095b053SXin Li
91*b095b053SXin Li /* Check in */
92*b095b053SXin Li checkin_worker_thread(threadpool, 0);
93*b095b053SXin Li
94*b095b053SXin Li /* Monitor new commands and act accordingly */
95*b095b053SXin Li for (;;) {
96*b095b053SXin Li uint32_t command = wait_for_new_command(threadpool, last_command, flags);
97*b095b053SXin Li pthreadpool_fence_acquire();
98*b095b053SXin Li
99*b095b053SXin Li flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
100*b095b053SXin Li
101*b095b053SXin Li /* Process command */
102*b095b053SXin Li switch (command & THREADPOOL_COMMAND_MASK) {
103*b095b053SXin Li case threadpool_command_parallelize:
104*b095b053SXin Li {
105*b095b053SXin Li const thread_function_t thread_function =
106*b095b053SXin Li (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
107*b095b053SXin Li if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
108*b095b053SXin Li saved_fpu_state = get_fpu_state();
109*b095b053SXin Li disable_fpu_denormals();
110*b095b053SXin Li }
111*b095b053SXin Li
112*b095b053SXin Li thread_function(threadpool, thread);
113*b095b053SXin Li if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
114*b095b053SXin Li set_fpu_state(saved_fpu_state);
115*b095b053SXin Li }
116*b095b053SXin Li break;
117*b095b053SXin Li }
118*b095b053SXin Li case threadpool_command_shutdown:
119*b095b053SXin Li /* Exit immediately: the master thread is waiting on pthread_join */
120*b095b053SXin Li return 0;
121*b095b053SXin Li case threadpool_command_init:
122*b095b053SXin Li /* To inhibit compiler warning */
123*b095b053SXin Li break;
124*b095b053SXin Li }
125*b095b053SXin Li /* Notify the master thread that we finished processing */
126*b095b053SXin Li const uint32_t event_index = command >> 31;
127*b095b053SXin Li checkin_worker_thread(threadpool, event_index);
128*b095b053SXin Li /* Update last command */
129*b095b053SXin Li last_command = command;
130*b095b053SXin Li };
131*b095b053SXin Li return 0;
132*b095b053SXin Li }
133*b095b053SXin Li
pthreadpool_create(size_t threads_count)134*b095b053SXin Li struct pthreadpool* pthreadpool_create(size_t threads_count) {
135*b095b053SXin Li if (threads_count == 0) {
136*b095b053SXin Li SYSTEM_INFO system_info;
137*b095b053SXin Li ZeroMemory(&system_info, sizeof(system_info));
138*b095b053SXin Li GetSystemInfo(&system_info);
139*b095b053SXin Li threads_count = (size_t) system_info.dwNumberOfProcessors;
140*b095b053SXin Li }
141*b095b053SXin Li
142*b095b053SXin Li struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
143*b095b053SXin Li if (threadpool == NULL) {
144*b095b053SXin Li return NULL;
145*b095b053SXin Li }
146*b095b053SXin Li threadpool->threads_count = fxdiv_init_size_t(threads_count);
147*b095b053SXin Li for (size_t tid = 0; tid < threads_count; tid++) {
148*b095b053SXin Li threadpool->threads[tid].thread_number = tid;
149*b095b053SXin Li threadpool->threads[tid].threadpool = threadpool;
150*b095b053SXin Li }
151*b095b053SXin Li
152*b095b053SXin Li /* Thread pool with a single thread computes everything on the caller thread. */
153*b095b053SXin Li if (threads_count > 1) {
154*b095b053SXin Li threadpool->execution_mutex = CreateMutexW(
155*b095b053SXin Li NULL /* mutex attributes */,
156*b095b053SXin Li FALSE /* initially owned */,
157*b095b053SXin Li NULL /* name */);
158*b095b053SXin Li for (size_t i = 0; i < 2; i++) {
159*b095b053SXin Li threadpool->completion_event[i] = CreateEventW(
160*b095b053SXin Li NULL /* event attributes */,
161*b095b053SXin Li TRUE /* manual-reset event: yes */,
162*b095b053SXin Li FALSE /* initial state: nonsignaled */,
163*b095b053SXin Li NULL /* name */);
164*b095b053SXin Li threadpool->command_event[i] = CreateEventW(
165*b095b053SXin Li NULL /* event attributes */,
166*b095b053SXin Li TRUE /* manual-reset event: yes */,
167*b095b053SXin Li FALSE /* initial state: nonsignaled */,
168*b095b053SXin Li NULL /* name */);
169*b095b053SXin Li }
170*b095b053SXin Li
171*b095b053SXin Li pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
172*b095b053SXin Li
173*b095b053SXin Li /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
174*b095b053SXin Li for (size_t tid = 1; tid < threads_count; tid++) {
175*b095b053SXin Li threadpool->threads[tid].thread_handle = CreateThread(
176*b095b053SXin Li NULL /* thread attributes */,
177*b095b053SXin Li 0 /* stack size: default */,
178*b095b053SXin Li &thread_main,
179*b095b053SXin Li &threadpool->threads[tid],
180*b095b053SXin Li 0 /* creation flags */,
181*b095b053SXin Li NULL /* thread id */);
182*b095b053SXin Li }
183*b095b053SXin Li
184*b095b053SXin Li /* Wait until all threads initialize */
185*b095b053SXin Li wait_worker_threads(threadpool, 0);
186*b095b053SXin Li }
187*b095b053SXin Li return threadpool;
188*b095b053SXin Li }
189*b095b053SXin Li
pthreadpool_parallelize(struct pthreadpool * threadpool,thread_function_t thread_function,const void * params,size_t params_size,void * task,void * context,size_t linear_range,uint32_t flags)190*b095b053SXin Li PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
191*b095b053SXin Li struct pthreadpool* threadpool,
192*b095b053SXin Li thread_function_t thread_function,
193*b095b053SXin Li const void* params,
194*b095b053SXin Li size_t params_size,
195*b095b053SXin Li void* task,
196*b095b053SXin Li void* context,
197*b095b053SXin Li size_t linear_range,
198*b095b053SXin Li uint32_t flags)
199*b095b053SXin Li {
200*b095b053SXin Li assert(threadpool != NULL);
201*b095b053SXin Li assert(thread_function != NULL);
202*b095b053SXin Li assert(task != NULL);
203*b095b053SXin Li assert(linear_range > 1);
204*b095b053SXin Li
205*b095b053SXin Li /* Protect the global threadpool structures */
206*b095b053SXin Li const DWORD wait_status = WaitForSingleObject(threadpool->execution_mutex, INFINITE);
207*b095b053SXin Li assert(wait_status == WAIT_OBJECT_0);
208*b095b053SXin Li
209*b095b053SXin Li /* Setup global arguments */
210*b095b053SXin Li pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
211*b095b053SXin Li pthreadpool_store_relaxed_void_p(&threadpool->task, task);
212*b095b053SXin Li pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
213*b095b053SXin Li pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
214*b095b053SXin Li
215*b095b053SXin Li const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
216*b095b053SXin Li pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */);
217*b095b053SXin Li
218*b095b053SXin Li if (params_size != 0) {
219*b095b053SXin Li CopyMemory(&threadpool->params, params, params_size);
220*b095b053SXin Li pthreadpool_fence_release();
221*b095b053SXin Li }
222*b095b053SXin Li
223*b095b053SXin Li /* Spread the work between threads */
224*b095b053SXin Li const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
225*b095b053SXin Li size_t range_start = 0;
226*b095b053SXin Li for (size_t tid = 0; tid < threads_count.value; tid++) {
227*b095b053SXin Li struct thread_info* thread = &threadpool->threads[tid];
228*b095b053SXin Li const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
229*b095b053SXin Li const size_t range_end = range_start + range_length;
230*b095b053SXin Li pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
231*b095b053SXin Li pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
232*b095b053SXin Li pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
233*b095b053SXin Li
234*b095b053SXin Li /* The next subrange starts where the previous ended */
235*b095b053SXin Li range_start = range_end;
236*b095b053SXin Li }
237*b095b053SXin Li
238*b095b053SXin Li /*
239*b095b053SXin Li * Update the threadpool command.
240*b095b053SXin Li * Imporantly, do it after initializing command parameters (range, task, argument, flags)
241*b095b053SXin Li * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
242*b095b053SXin Li * to ensure the unmasked command is different then the last command, because worker threads
243*b095b053SXin Li * monitor for change in the unmasked command.
244*b095b053SXin Li */
245*b095b053SXin Li const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
246*b095b053SXin Li const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize;
247*b095b053SXin Li
248*b095b053SXin Li /*
249*b095b053SXin Li * Reset the command event for the next command.
250*b095b053SXin Li * It is important to reset the event before writing out the new command, because as soon as the worker threads
251*b095b053SXin Li * observe the new command, they may process it and switch to waiting on the next command event.
252*b095b053SXin Li *
253*b095b053SXin Li * Note: the event is different from the command event signalled in this update.
254*b095b053SXin Li */
255*b095b053SXin Li const uint32_t event_index = (old_command >> 31);
256*b095b053SXin Li BOOL reset_event_status = ResetEvent(threadpool->command_event[event_index ^ 1]);
257*b095b053SXin Li assert(reset_event_status != FALSE);
258*b095b053SXin Li
259*b095b053SXin Li /*
260*b095b053SXin Li * Store the command with release semantics to guarantee that if a worker thread observes
261*b095b053SXin Li * the new command value, it also observes the updated command parameters.
262*b095b053SXin Li *
263*b095b053SXin Li * Note: release semantics is necessary, because the workers might be waiting in a spin-loop
264*b095b053SXin Li * rather than on the event object.
265*b095b053SXin Li */
266*b095b053SXin Li pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
267*b095b053SXin Li
268*b095b053SXin Li /*
269*b095b053SXin Li * Signal the event to wake up the threads.
270*b095b053SXin Li * Event in use must be switched after every submitted command to avoid race conditions.
271*b095b053SXin Li * Choose the event based on the high bit of the command, which is flipped on every update.
272*b095b053SXin Li */
273*b095b053SXin Li const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
274*b095b053SXin Li assert(set_event_status != FALSE);
275*b095b053SXin Li
276*b095b053SXin Li /* Save and modify FPU denormals control, if needed */
277*b095b053SXin Li struct fpu_state saved_fpu_state = { 0 };
278*b095b053SXin Li if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
279*b095b053SXin Li saved_fpu_state = get_fpu_state();
280*b095b053SXin Li disable_fpu_denormals();
281*b095b053SXin Li }
282*b095b053SXin Li
283*b095b053SXin Li /* Do computations as worker #0 */
284*b095b053SXin Li thread_function(threadpool, &threadpool->threads[0]);
285*b095b053SXin Li
286*b095b053SXin Li /* Restore FPU denormals control, if needed */
287*b095b053SXin Li if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
288*b095b053SXin Li set_fpu_state(saved_fpu_state);
289*b095b053SXin Li }
290*b095b053SXin Li
291*b095b053SXin Li /*
292*b095b053SXin Li * Wait until the threads finish computation
293*b095b053SXin Li * Use the complementary event because it corresponds to the new command.
294*b095b053SXin Li */
295*b095b053SXin Li wait_worker_threads(threadpool, event_index ^ 1);
296*b095b053SXin Li
297*b095b053SXin Li /*
298*b095b053SXin Li * Reset the completion event for the next command.
299*b095b053SXin Li * Note: the event is different from the one used for waiting in this update.
300*b095b053SXin Li */
301*b095b053SXin Li reset_event_status = ResetEvent(threadpool->completion_event[event_index]);
302*b095b053SXin Li assert(reset_event_status != FALSE);
303*b095b053SXin Li
304*b095b053SXin Li /* Make changes by other threads visible to this thread */
305*b095b053SXin Li pthreadpool_fence_acquire();
306*b095b053SXin Li
307*b095b053SXin Li /* Unprotect the global threadpool structures */
308*b095b053SXin Li const BOOL release_mutex_status = ReleaseMutex(threadpool->execution_mutex);
309*b095b053SXin Li assert(release_mutex_status != FALSE);
310*b095b053SXin Li }
311*b095b053SXin Li
pthreadpool_destroy(struct pthreadpool * threadpool)312*b095b053SXin Li void pthreadpool_destroy(struct pthreadpool* threadpool) {
313*b095b053SXin Li if (threadpool != NULL) {
314*b095b053SXin Li const size_t threads_count = threadpool->threads_count.value;
315*b095b053SXin Li if (threads_count > 1) {
316*b095b053SXin Li pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
317*b095b053SXin Li
318*b095b053SXin Li /*
319*b095b053SXin Li * Store the command with release semantics to guarantee that if a worker thread observes
320*b095b053SXin Li * the new command value, it also observes the updated active_threads values.
321*b095b053SXin Li */
322*b095b053SXin Li const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
323*b095b053SXin Li pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
324*b095b053SXin Li
325*b095b053SXin Li /*
326*b095b053SXin Li * Signal the event to wake up the threads.
327*b095b053SXin Li * Event in use must be switched after every submitted command to avoid race conditions.
328*b095b053SXin Li * Choose the event based on the high bit of the command, which is flipped on every update.
329*b095b053SXin Li */
330*b095b053SXin Li const uint32_t event_index = (old_command >> 31);
331*b095b053SXin Li const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
332*b095b053SXin Li assert(set_event_status != FALSE);
333*b095b053SXin Li
334*b095b053SXin Li /* Wait until all threads return */
335*b095b053SXin Li for (size_t tid = 1; tid < threads_count; tid++) {
336*b095b053SXin Li const HANDLE thread_handle = threadpool->threads[tid].thread_handle;
337*b095b053SXin Li if (thread_handle != NULL) {
338*b095b053SXin Li const DWORD wait_status = WaitForSingleObject(thread_handle, INFINITE);
339*b095b053SXin Li assert(wait_status == WAIT_OBJECT_0);
340*b095b053SXin Li
341*b095b053SXin Li const BOOL close_status = CloseHandle(thread_handle);
342*b095b053SXin Li assert(close_status != FALSE);
343*b095b053SXin Li }
344*b095b053SXin Li }
345*b095b053SXin Li
346*b095b053SXin Li /* Release resources */
347*b095b053SXin Li if (threadpool->execution_mutex != NULL) {
348*b095b053SXin Li const BOOL close_status = CloseHandle(threadpool->execution_mutex);
349*b095b053SXin Li assert(close_status != FALSE);
350*b095b053SXin Li }
351*b095b053SXin Li for (size_t i = 0; i < 2; i++) {
352*b095b053SXin Li if (threadpool->command_event[i] != NULL) {
353*b095b053SXin Li const BOOL close_status = CloseHandle(threadpool->command_event[i]);
354*b095b053SXin Li assert(close_status != FALSE);
355*b095b053SXin Li }
356*b095b053SXin Li if (threadpool->completion_event[i] != NULL) {
357*b095b053SXin Li const BOOL close_status = CloseHandle(threadpool->completion_event[i]);
358*b095b053SXin Li assert(close_status != FALSE);
359*b095b053SXin Li }
360*b095b053SXin Li }
361*b095b053SXin Li }
362*b095b053SXin Li pthreadpool_deallocate(threadpool);
363*b095b053SXin Li }
364*b095b053SXin Li }
365