xref: /aosp_15_r20/external/pthreadpool/src/windows.c (revision b095b0533730c2930f947df924a4486d266faa1a)
1 /* Standard C headers */
2 #include <assert.h>
3 #include <stdbool.h>
4 #include <stdint.h>
5 #include <stdlib.h>
6 #include <string.h>
7 
8 /* Configuration header */
9 #include "threadpool-common.h"
10 
11 /* Windows headers */
12 #include <windows.h>
13 
14 /* Public library header */
15 #include <pthreadpool.h>
16 
17 /* Internal library headers */
18 #include "threadpool-atomics.h"
19 #include "threadpool-object.h"
20 #include "threadpool-utils.h"
21 
22 
checkin_worker_thread(struct pthreadpool * threadpool,uint32_t event_index)23 static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) {
24 	if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) {
25 		SetEvent(threadpool->completion_event[event_index]);
26 	}
27 }
28 
wait_worker_threads(struct pthreadpool * threadpool,uint32_t event_index)29 static void wait_worker_threads(struct pthreadpool* threadpool, uint32_t event_index) {
30 	/* Initial check */
31 	size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
32 	if (active_threads == 0) {
33 		return;
34 	}
35 
36 	/* Spin-wait */
37 	for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
38 		pthreadpool_yield();
39 
40 		active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
41 		if (active_threads == 0) {
42 			return;
43 		}
44 	}
45 
46 	/* Fall-back to event wait */
47 	const DWORD wait_status = WaitForSingleObject(threadpool->completion_event[event_index], INFINITE);
48 	assert(wait_status == WAIT_OBJECT_0);
49 	assert(pthreadpool_load_relaxed_size_t(&threadpool->active_threads) == 0);
50 }
51 
wait_for_new_command(struct pthreadpool * threadpool,uint32_t last_command,uint32_t last_flags)52 static uint32_t wait_for_new_command(
53 	struct pthreadpool* threadpool,
54 	uint32_t last_command,
55 	uint32_t last_flags)
56 {
57 	uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
58 	if (command != last_command) {
59 		return command;
60 	}
61 
62 	if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
63 		/* Spin-wait loop */
64 		for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
65 			pthreadpool_yield();
66 
67 			command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
68 			if (command != last_command) {
69 				return command;
70 			}
71 		}
72 	}
73 
74 	/* Spin-wait disabled or timed out, fall back to event wait */
75 	const uint32_t event_index = (last_command >> 31);
76 	const DWORD wait_status = WaitForSingleObject(threadpool->command_event[event_index], INFINITE);
77 	assert(wait_status == WAIT_OBJECT_0);
78 
79 	command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
80 	assert(command != last_command);
81 	return command;
82 }
83 
thread_main(LPVOID arg)84 static DWORD WINAPI thread_main(LPVOID arg) {
85 	struct thread_info* thread = (struct thread_info*) arg;
86 	struct pthreadpool* threadpool = thread->threadpool;
87 	uint32_t last_command = threadpool_command_init;
88 	struct fpu_state saved_fpu_state = { 0 };
89 	uint32_t flags = 0;
90 
91 	/* Check in */
92 	checkin_worker_thread(threadpool, 0);
93 
94 	/* Monitor new commands and act accordingly */
95 	for (;;) {
96 		uint32_t command = wait_for_new_command(threadpool, last_command, flags);
97 		pthreadpool_fence_acquire();
98 
99 		flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
100 
101 		/* Process command */
102 		switch (command & THREADPOOL_COMMAND_MASK) {
103 			case threadpool_command_parallelize:
104 			{
105 				const thread_function_t thread_function =
106 					(thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
107 				if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
108 					saved_fpu_state = get_fpu_state();
109 					disable_fpu_denormals();
110 				}
111 
112 				thread_function(threadpool, thread);
113 				if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
114 					set_fpu_state(saved_fpu_state);
115 				}
116 				break;
117 			}
118 			case threadpool_command_shutdown:
119 				/* Exit immediately: the master thread is waiting on pthread_join */
120 				return 0;
121 			case threadpool_command_init:
122 				/* To inhibit compiler warning */
123 				break;
124 		}
125 		/* Notify the master thread that we finished processing */
126 		const uint32_t event_index = command >> 31;
127 		checkin_worker_thread(threadpool, event_index);
128 		/* Update last command */
129 		last_command = command;
130 	};
131 	return 0;
132 }
133 
pthreadpool_create(size_t threads_count)134 struct pthreadpool* pthreadpool_create(size_t threads_count) {
135 	if (threads_count == 0) {
136 		SYSTEM_INFO system_info;
137 		ZeroMemory(&system_info, sizeof(system_info));
138 		GetSystemInfo(&system_info);
139 		threads_count = (size_t) system_info.dwNumberOfProcessors;
140 	}
141 
142 	struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
143 	if (threadpool == NULL) {
144 		return NULL;
145 	}
146 	threadpool->threads_count = fxdiv_init_size_t(threads_count);
147 	for (size_t tid = 0; tid < threads_count; tid++) {
148 		threadpool->threads[tid].thread_number = tid;
149 		threadpool->threads[tid].threadpool = threadpool;
150 	}
151 
152 	/* Thread pool with a single thread computes everything on the caller thread. */
153 	if (threads_count > 1) {
154 		threadpool->execution_mutex = CreateMutexW(
155 			NULL /* mutex attributes */,
156 			FALSE /* initially owned */,
157 			NULL /* name */);
158 		for (size_t i = 0; i < 2; i++) {
159 			threadpool->completion_event[i] = CreateEventW(
160 				NULL /* event attributes */,
161 				TRUE /* manual-reset event: yes */,
162 				FALSE /* initial state: nonsignaled */,
163 				NULL /* name */);
164 			threadpool->command_event[i] = CreateEventW(
165 				NULL /* event attributes */,
166 				TRUE /* manual-reset event: yes */,
167 				FALSE /* initial state: nonsignaled */,
168 				NULL /* name */);
169 		}
170 
171 		pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
172 
173 		/* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
174 		for (size_t tid = 1; tid < threads_count; tid++) {
175 			threadpool->threads[tid].thread_handle = CreateThread(
176 				NULL /* thread attributes */,
177 				0 /* stack size: default */,
178 				&thread_main,
179 				&threadpool->threads[tid],
180 				0 /* creation flags */,
181 				NULL /* thread id */);
182 		}
183 
184 		/* Wait until all threads initialize */
185 		wait_worker_threads(threadpool, 0);
186 	}
187 	return threadpool;
188 }
189 
pthreadpool_parallelize(struct pthreadpool * threadpool,thread_function_t thread_function,const void * params,size_t params_size,void * task,void * context,size_t linear_range,uint32_t flags)190 PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
191 	struct pthreadpool* threadpool,
192 	thread_function_t thread_function,
193 	const void* params,
194 	size_t params_size,
195 	void* task,
196 	void* context,
197 	size_t linear_range,
198 	uint32_t flags)
199 {
200 	assert(threadpool != NULL);
201 	assert(thread_function != NULL);
202 	assert(task != NULL);
203 	assert(linear_range > 1);
204 
205 	/* Protect the global threadpool structures */
206 	const DWORD wait_status = WaitForSingleObject(threadpool->execution_mutex, INFINITE);
207 	assert(wait_status == WAIT_OBJECT_0);
208 
209 	/* Setup global arguments */
210 	pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
211 	pthreadpool_store_relaxed_void_p(&threadpool->task, task);
212 	pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
213 	pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
214 
215 	const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
216 	pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */);
217 
218 	if (params_size != 0) {
219 		CopyMemory(&threadpool->params, params, params_size);
220 		pthreadpool_fence_release();
221 	}
222 
223 	/* Spread the work between threads */
224 	const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
225 	size_t range_start = 0;
226 	for (size_t tid = 0; tid < threads_count.value; tid++) {
227 		struct thread_info* thread = &threadpool->threads[tid];
228 		const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
229 		const size_t range_end = range_start + range_length;
230 		pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
231 		pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
232 		pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
233 
234 		/* The next subrange starts where the previous ended */
235 		range_start = range_end;
236 	}
237 
238 	/*
239 	 * Update the threadpool command.
240 	 * Imporantly, do it after initializing command parameters (range, task, argument, flags)
241 	 * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
242 	 * to ensure the unmasked command is different then the last command, because worker threads
243 	 * monitor for change in the unmasked command.
244 	 */
245 	const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
246 	const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize;
247 
248 	/*
249 	 * Reset the command event for the next command.
250 	 * It is important to reset the event before writing out the new command, because as soon as the worker threads
251 	 * observe the new command, they may process it and switch to waiting on the next command event.
252 	 *
253 	 * Note: the event is different from the command event signalled in this update.
254 	 */
255 	const uint32_t event_index = (old_command >> 31);
256 	BOOL reset_event_status = ResetEvent(threadpool->command_event[event_index ^ 1]);
257 	assert(reset_event_status != FALSE);
258 
259 	/*
260 	 * Store the command with release semantics to guarantee that if a worker thread observes
261 	 * the new command value, it also observes the updated command parameters.
262 	 *
263 	 * Note: release semantics is necessary, because the workers might be waiting in a spin-loop
264 	 * rather than on the event object.
265 	 */
266 	pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
267 
268 	/*
269 	 * Signal the event to wake up the threads.
270 	 * Event in use must be switched after every submitted command to avoid race conditions.
271 	 * Choose the event based on the high bit of the command, which is flipped on every update.
272 	 */
273 	const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
274 	assert(set_event_status != FALSE);
275 
276 	/* Save and modify FPU denormals control, if needed */
277 	struct fpu_state saved_fpu_state = { 0 };
278 	if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
279 		saved_fpu_state = get_fpu_state();
280 		disable_fpu_denormals();
281 	}
282 
283 	/* Do computations as worker #0 */
284 	thread_function(threadpool, &threadpool->threads[0]);
285 
286 	/* Restore FPU denormals control, if needed */
287 	if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
288 		set_fpu_state(saved_fpu_state);
289 	}
290 
291 	/*
292 	 * Wait until the threads finish computation
293 	 * Use the complementary event because it corresponds to the new command.
294 	 */
295 	wait_worker_threads(threadpool, event_index ^ 1);
296 
297 	/*
298 	 * Reset the completion event for the next command.
299 	 * Note: the event is different from the one used for waiting in this update.
300 	 */
301 	reset_event_status = ResetEvent(threadpool->completion_event[event_index]);
302 	assert(reset_event_status != FALSE);
303 
304 	/* Make changes by other threads visible to this thread */
305 	pthreadpool_fence_acquire();
306 
307 	/* Unprotect the global threadpool structures */
308 	const BOOL release_mutex_status = ReleaseMutex(threadpool->execution_mutex);
309 	assert(release_mutex_status != FALSE);
310 }
311 
pthreadpool_destroy(struct pthreadpool * threadpool)312 void pthreadpool_destroy(struct pthreadpool* threadpool) {
313 	if (threadpool != NULL) {
314 		const size_t threads_count = threadpool->threads_count.value;
315 		if (threads_count > 1) {
316 			pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
317 
318 			/*
319 			 * Store the command with release semantics to guarantee that if a worker thread observes
320 			 * the new command value, it also observes the updated active_threads values.
321 			 */
322 			const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
323 			pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
324 
325 			/*
326 			 * Signal the event to wake up the threads.
327 			 * Event in use must be switched after every submitted command to avoid race conditions.
328 			 * Choose the event based on the high bit of the command, which is flipped on every update.
329 			 */
330 			const uint32_t event_index = (old_command >> 31);
331 			const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
332 			assert(set_event_status != FALSE);
333 
334 			/* Wait until all threads return */
335 			for (size_t tid = 1; tid < threads_count; tid++) {
336 				const HANDLE thread_handle = threadpool->threads[tid].thread_handle;
337 				if (thread_handle != NULL) {
338 					const DWORD wait_status = WaitForSingleObject(thread_handle, INFINITE);
339 					assert(wait_status == WAIT_OBJECT_0);
340 
341 					const BOOL close_status = CloseHandle(thread_handle);
342 					assert(close_status != FALSE);
343 				}
344 			}
345 
346 			/* Release resources */
347 			if (threadpool->execution_mutex != NULL) {
348 				const BOOL close_status = CloseHandle(threadpool->execution_mutex);
349 				assert(close_status != FALSE);
350 			}
351 			for (size_t i = 0; i < 2; i++) {
352 				if (threadpool->command_event[i] != NULL) {
353 					const BOOL close_status = CloseHandle(threadpool->command_event[i]);
354 					assert(close_status != FALSE);
355 				}
356 				if (threadpool->completion_event[i] != NULL) {
357 					const BOOL close_status = CloseHandle(threadpool->completion_event[i]);
358 					assert(close_status != FALSE);
359 				}
360 			}
361 		}
362 		pthreadpool_deallocate(threadpool);
363 	}
364 }
365