1*b2055c35SXin Li // Copyright 2011 Google Inc. All Rights Reserved.
2*b2055c35SXin Li //
3*b2055c35SXin Li // Use of this source code is governed by a BSD-style license
4*b2055c35SXin Li // that can be found in the COPYING file in the root of the source
5*b2055c35SXin Li // tree. An additional intellectual property rights grant can be found
6*b2055c35SXin Li // in the file PATENTS. All contributing project authors may
7*b2055c35SXin Li // be found in the AUTHORS file in the root of the source tree.
8*b2055c35SXin Li // -----------------------------------------------------------------------------
9*b2055c35SXin Li //
10*b2055c35SXin Li // Multi-threaded worker
11*b2055c35SXin Li //
12*b2055c35SXin Li // Author: Skal ([email protected])
13*b2055c35SXin Li
14*b2055c35SXin Li #include <assert.h>
15*b2055c35SXin Li #include <string.h> // for memset()
16*b2055c35SXin Li #include "src/utils/thread_utils.h"
17*b2055c35SXin Li #include "src/utils/utils.h"
18*b2055c35SXin Li
19*b2055c35SXin Li #ifdef WEBP_USE_THREAD
20*b2055c35SXin Li
21*b2055c35SXin Li #if defined(_WIN32)
22*b2055c35SXin Li
23*b2055c35SXin Li #include <windows.h>
24*b2055c35SXin Li typedef HANDLE pthread_t;
25*b2055c35SXin Li typedef CRITICAL_SECTION pthread_mutex_t;
26*b2055c35SXin Li
27*b2055c35SXin Li #if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater
28*b2055c35SXin Li #define USE_WINDOWS_CONDITION_VARIABLE
29*b2055c35SXin Li typedef CONDITION_VARIABLE pthread_cond_t;
30*b2055c35SXin Li #else
31*b2055c35SXin Li typedef struct {
32*b2055c35SXin Li HANDLE waiting_sem_;
33*b2055c35SXin Li HANDLE received_sem_;
34*b2055c35SXin Li HANDLE signal_event_;
35*b2055c35SXin Li } pthread_cond_t;
36*b2055c35SXin Li #endif // _WIN32_WINNT >= 0x600
37*b2055c35SXin Li
38*b2055c35SXin Li #ifndef WINAPI_FAMILY_PARTITION
39*b2055c35SXin Li #define WINAPI_PARTITION_DESKTOP 1
40*b2055c35SXin Li #define WINAPI_FAMILY_PARTITION(x) x
41*b2055c35SXin Li #endif
42*b2055c35SXin Li
43*b2055c35SXin Li #if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)
44*b2055c35SXin Li #define USE_CREATE_THREAD
45*b2055c35SXin Li #endif
46*b2055c35SXin Li
47*b2055c35SXin Li #else // !_WIN32
48*b2055c35SXin Li
49*b2055c35SXin Li #include <pthread.h>
50*b2055c35SXin Li
51*b2055c35SXin Li #endif // _WIN32
52*b2055c35SXin Li
53*b2055c35SXin Li typedef struct {
54*b2055c35SXin Li pthread_mutex_t mutex_;
55*b2055c35SXin Li pthread_cond_t condition_;
56*b2055c35SXin Li pthread_t thread_;
57*b2055c35SXin Li } WebPWorkerImpl;
58*b2055c35SXin Li
59*b2055c35SXin Li #if defined(_WIN32)
60*b2055c35SXin Li
61*b2055c35SXin Li //------------------------------------------------------------------------------
62*b2055c35SXin Li // simplistic pthread emulation layer
63*b2055c35SXin Li
64*b2055c35SXin Li #include <process.h>
65*b2055c35SXin Li
66*b2055c35SXin Li // _beginthreadex requires __stdcall
67*b2055c35SXin Li #define THREADFN unsigned int __stdcall
68*b2055c35SXin Li #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
69*b2055c35SXin Li
70*b2055c35SXin Li #if _WIN32_WINNT >= 0x0501 // Windows XP or greater
71*b2055c35SXin Li #define WaitForSingleObject(obj, timeout) \
72*b2055c35SXin Li WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/)
73*b2055c35SXin Li #endif
74*b2055c35SXin Li
pthread_create(pthread_t * const thread,const void * attr,unsigned int (__stdcall * start)(void *),void * arg)75*b2055c35SXin Li static int pthread_create(pthread_t* const thread, const void* attr,
76*b2055c35SXin Li unsigned int (__stdcall* start)(void*), void* arg) {
77*b2055c35SXin Li (void)attr;
78*b2055c35SXin Li #ifdef USE_CREATE_THREAD
79*b2055c35SXin Li *thread = CreateThread(NULL, /* lpThreadAttributes */
80*b2055c35SXin Li 0, /* dwStackSize */
81*b2055c35SXin Li start,
82*b2055c35SXin Li arg,
83*b2055c35SXin Li 0, /* dwStackSize */
84*b2055c35SXin Li NULL); /* lpThreadId */
85*b2055c35SXin Li #else
86*b2055c35SXin Li *thread = (pthread_t)_beginthreadex(NULL, /* void *security */
87*b2055c35SXin Li 0, /* unsigned stack_size */
88*b2055c35SXin Li start,
89*b2055c35SXin Li arg,
90*b2055c35SXin Li 0, /* unsigned initflag */
91*b2055c35SXin Li NULL); /* unsigned *thrdaddr */
92*b2055c35SXin Li #endif
93*b2055c35SXin Li if (*thread == NULL) return 1;
94*b2055c35SXin Li SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
95*b2055c35SXin Li return 0;
96*b2055c35SXin Li }
97*b2055c35SXin Li
pthread_join(pthread_t thread,void ** value_ptr)98*b2055c35SXin Li static int pthread_join(pthread_t thread, void** value_ptr) {
99*b2055c35SXin Li (void)value_ptr;
100*b2055c35SXin Li return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
101*b2055c35SXin Li CloseHandle(thread) == 0);
102*b2055c35SXin Li }
103*b2055c35SXin Li
104*b2055c35SXin Li // Mutex
pthread_mutex_init(pthread_mutex_t * const mutex,void * mutexattr)105*b2055c35SXin Li static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
106*b2055c35SXin Li (void)mutexattr;
107*b2055c35SXin Li #if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater
108*b2055c35SXin Li InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/);
109*b2055c35SXin Li #else
110*b2055c35SXin Li InitializeCriticalSection(mutex);
111*b2055c35SXin Li #endif
112*b2055c35SXin Li return 0;
113*b2055c35SXin Li }
114*b2055c35SXin Li
pthread_mutex_lock(pthread_mutex_t * const mutex)115*b2055c35SXin Li static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
116*b2055c35SXin Li EnterCriticalSection(mutex);
117*b2055c35SXin Li return 0;
118*b2055c35SXin Li }
119*b2055c35SXin Li
pthread_mutex_unlock(pthread_mutex_t * const mutex)120*b2055c35SXin Li static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
121*b2055c35SXin Li LeaveCriticalSection(mutex);
122*b2055c35SXin Li return 0;
123*b2055c35SXin Li }
124*b2055c35SXin Li
pthread_mutex_destroy(pthread_mutex_t * const mutex)125*b2055c35SXin Li static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
126*b2055c35SXin Li DeleteCriticalSection(mutex);
127*b2055c35SXin Li return 0;
128*b2055c35SXin Li }
129*b2055c35SXin Li
130*b2055c35SXin Li // Condition
pthread_cond_destroy(pthread_cond_t * const condition)131*b2055c35SXin Li static int pthread_cond_destroy(pthread_cond_t* const condition) {
132*b2055c35SXin Li int ok = 1;
133*b2055c35SXin Li #ifdef USE_WINDOWS_CONDITION_VARIABLE
134*b2055c35SXin Li (void)condition;
135*b2055c35SXin Li #else
136*b2055c35SXin Li ok &= (CloseHandle(condition->waiting_sem_) != 0);
137*b2055c35SXin Li ok &= (CloseHandle(condition->received_sem_) != 0);
138*b2055c35SXin Li ok &= (CloseHandle(condition->signal_event_) != 0);
139*b2055c35SXin Li #endif
140*b2055c35SXin Li return !ok;
141*b2055c35SXin Li }
142*b2055c35SXin Li
pthread_cond_init(pthread_cond_t * const condition,void * cond_attr)143*b2055c35SXin Li static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
144*b2055c35SXin Li (void)cond_attr;
145*b2055c35SXin Li #ifdef USE_WINDOWS_CONDITION_VARIABLE
146*b2055c35SXin Li InitializeConditionVariable(condition);
147*b2055c35SXin Li #else
148*b2055c35SXin Li condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
149*b2055c35SXin Li condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
150*b2055c35SXin Li condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
151*b2055c35SXin Li if (condition->waiting_sem_ == NULL ||
152*b2055c35SXin Li condition->received_sem_ == NULL ||
153*b2055c35SXin Li condition->signal_event_ == NULL) {
154*b2055c35SXin Li pthread_cond_destroy(condition);
155*b2055c35SXin Li return 1;
156*b2055c35SXin Li }
157*b2055c35SXin Li #endif
158*b2055c35SXin Li return 0;
159*b2055c35SXin Li }
160*b2055c35SXin Li
pthread_cond_signal(pthread_cond_t * const condition)161*b2055c35SXin Li static int pthread_cond_signal(pthread_cond_t* const condition) {
162*b2055c35SXin Li int ok = 1;
163*b2055c35SXin Li #ifdef USE_WINDOWS_CONDITION_VARIABLE
164*b2055c35SXin Li WakeConditionVariable(condition);
165*b2055c35SXin Li #else
166*b2055c35SXin Li if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
167*b2055c35SXin Li // a thread is waiting in pthread_cond_wait: allow it to be notified
168*b2055c35SXin Li ok = SetEvent(condition->signal_event_);
169*b2055c35SXin Li // wait until the event is consumed so the signaler cannot consume
170*b2055c35SXin Li // the event via its own pthread_cond_wait.
171*b2055c35SXin Li ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
172*b2055c35SXin Li WAIT_OBJECT_0);
173*b2055c35SXin Li }
174*b2055c35SXin Li #endif
175*b2055c35SXin Li return !ok;
176*b2055c35SXin Li }
177*b2055c35SXin Li
pthread_cond_wait(pthread_cond_t * const condition,pthread_mutex_t * const mutex)178*b2055c35SXin Li static int pthread_cond_wait(pthread_cond_t* const condition,
179*b2055c35SXin Li pthread_mutex_t* const mutex) {
180*b2055c35SXin Li int ok;
181*b2055c35SXin Li #ifdef USE_WINDOWS_CONDITION_VARIABLE
182*b2055c35SXin Li ok = SleepConditionVariableCS(condition, mutex, INFINITE);
183*b2055c35SXin Li #else
184*b2055c35SXin Li // note that there is a consumer available so the signal isn't dropped in
185*b2055c35SXin Li // pthread_cond_signal
186*b2055c35SXin Li if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) return 1;
187*b2055c35SXin Li // now unlock the mutex so pthread_cond_signal may be issued
188*b2055c35SXin Li pthread_mutex_unlock(mutex);
189*b2055c35SXin Li ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
190*b2055c35SXin Li WAIT_OBJECT_0);
191*b2055c35SXin Li ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
192*b2055c35SXin Li pthread_mutex_lock(mutex);
193*b2055c35SXin Li #endif
194*b2055c35SXin Li return !ok;
195*b2055c35SXin Li }
196*b2055c35SXin Li
197*b2055c35SXin Li #else // !_WIN32
198*b2055c35SXin Li # define THREADFN void*
199*b2055c35SXin Li # define THREAD_RETURN(val) val
200*b2055c35SXin Li #endif // _WIN32
201*b2055c35SXin Li
202*b2055c35SXin Li //------------------------------------------------------------------------------
203*b2055c35SXin Li
ThreadLoop(void * ptr)204*b2055c35SXin Li static THREADFN ThreadLoop(void* ptr) {
205*b2055c35SXin Li WebPWorker* const worker = (WebPWorker*)ptr;
206*b2055c35SXin Li WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;
207*b2055c35SXin Li int done = 0;
208*b2055c35SXin Li while (!done) {
209*b2055c35SXin Li pthread_mutex_lock(&impl->mutex_);
210*b2055c35SXin Li while (worker->status_ == OK) { // wait in idling mode
211*b2055c35SXin Li pthread_cond_wait(&impl->condition_, &impl->mutex_);
212*b2055c35SXin Li }
213*b2055c35SXin Li if (worker->status_ == WORK) {
214*b2055c35SXin Li WebPGetWorkerInterface()->Execute(worker);
215*b2055c35SXin Li worker->status_ = OK;
216*b2055c35SXin Li } else if (worker->status_ == NOT_OK) { // finish the worker
217*b2055c35SXin Li done = 1;
218*b2055c35SXin Li }
219*b2055c35SXin Li // signal to the main thread that we're done (for Sync())
220*b2055c35SXin Li // Note the associated mutex does not need to be held when signaling the
221*b2055c35SXin Li // condition. Unlocking the mutex first may improve performance in some
222*b2055c35SXin Li // implementations, avoiding the case where the waiting thread can't
223*b2055c35SXin Li // reacquire the mutex when woken.
224*b2055c35SXin Li pthread_mutex_unlock(&impl->mutex_);
225*b2055c35SXin Li pthread_cond_signal(&impl->condition_);
226*b2055c35SXin Li }
227*b2055c35SXin Li return THREAD_RETURN(NULL); // Thread is finished
228*b2055c35SXin Li }
229*b2055c35SXin Li
230*b2055c35SXin Li // main thread state control
ChangeState(WebPWorker * const worker,WebPWorkerStatus new_status)231*b2055c35SXin Li static void ChangeState(WebPWorker* const worker, WebPWorkerStatus new_status) {
232*b2055c35SXin Li // No-op when attempting to change state on a thread that didn't come up.
233*b2055c35SXin Li // Checking status_ without acquiring the lock first would result in a data
234*b2055c35SXin Li // race.
235*b2055c35SXin Li WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;
236*b2055c35SXin Li if (impl == NULL) return;
237*b2055c35SXin Li
238*b2055c35SXin Li pthread_mutex_lock(&impl->mutex_);
239*b2055c35SXin Li if (worker->status_ >= OK) {
240*b2055c35SXin Li // wait for the worker to finish
241*b2055c35SXin Li while (worker->status_ != OK) {
242*b2055c35SXin Li pthread_cond_wait(&impl->condition_, &impl->mutex_);
243*b2055c35SXin Li }
244*b2055c35SXin Li // assign new status and release the working thread if needed
245*b2055c35SXin Li if (new_status != OK) {
246*b2055c35SXin Li worker->status_ = new_status;
247*b2055c35SXin Li // Note the associated mutex does not need to be held when signaling the
248*b2055c35SXin Li // condition. Unlocking the mutex first may improve performance in some
249*b2055c35SXin Li // implementations, avoiding the case where the waiting thread can't
250*b2055c35SXin Li // reacquire the mutex when woken.
251*b2055c35SXin Li pthread_mutex_unlock(&impl->mutex_);
252*b2055c35SXin Li pthread_cond_signal(&impl->condition_);
253*b2055c35SXin Li return;
254*b2055c35SXin Li }
255*b2055c35SXin Li }
256*b2055c35SXin Li pthread_mutex_unlock(&impl->mutex_);
257*b2055c35SXin Li }
258*b2055c35SXin Li
259*b2055c35SXin Li #endif // WEBP_USE_THREAD
260*b2055c35SXin Li
261*b2055c35SXin Li //------------------------------------------------------------------------------
262*b2055c35SXin Li
Init(WebPWorker * const worker)263*b2055c35SXin Li static void Init(WebPWorker* const worker) {
264*b2055c35SXin Li memset(worker, 0, sizeof(*worker));
265*b2055c35SXin Li worker->status_ = NOT_OK;
266*b2055c35SXin Li }
267*b2055c35SXin Li
Sync(WebPWorker * const worker)268*b2055c35SXin Li static int Sync(WebPWorker* const worker) {
269*b2055c35SXin Li #ifdef WEBP_USE_THREAD
270*b2055c35SXin Li ChangeState(worker, OK);
271*b2055c35SXin Li #endif
272*b2055c35SXin Li assert(worker->status_ <= OK);
273*b2055c35SXin Li return !worker->had_error;
274*b2055c35SXin Li }
275*b2055c35SXin Li
Reset(WebPWorker * const worker)276*b2055c35SXin Li static int Reset(WebPWorker* const worker) {
277*b2055c35SXin Li int ok = 1;
278*b2055c35SXin Li worker->had_error = 0;
279*b2055c35SXin Li if (worker->status_ < OK) {
280*b2055c35SXin Li #ifdef WEBP_USE_THREAD
281*b2055c35SXin Li WebPWorkerImpl* const impl =
282*b2055c35SXin Li (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(WebPWorkerImpl));
283*b2055c35SXin Li worker->impl_ = (void*)impl;
284*b2055c35SXin Li if (worker->impl_ == NULL) {
285*b2055c35SXin Li return 0;
286*b2055c35SXin Li }
287*b2055c35SXin Li if (pthread_mutex_init(&impl->mutex_, NULL)) {
288*b2055c35SXin Li goto Error;
289*b2055c35SXin Li }
290*b2055c35SXin Li if (pthread_cond_init(&impl->condition_, NULL)) {
291*b2055c35SXin Li pthread_mutex_destroy(&impl->mutex_);
292*b2055c35SXin Li goto Error;
293*b2055c35SXin Li }
294*b2055c35SXin Li pthread_mutex_lock(&impl->mutex_);
295*b2055c35SXin Li ok = !pthread_create(&impl->thread_, NULL, ThreadLoop, worker);
296*b2055c35SXin Li if (ok) worker->status_ = OK;
297*b2055c35SXin Li pthread_mutex_unlock(&impl->mutex_);
298*b2055c35SXin Li if (!ok) {
299*b2055c35SXin Li pthread_mutex_destroy(&impl->mutex_);
300*b2055c35SXin Li pthread_cond_destroy(&impl->condition_);
301*b2055c35SXin Li Error:
302*b2055c35SXin Li WebPSafeFree(impl);
303*b2055c35SXin Li worker->impl_ = NULL;
304*b2055c35SXin Li return 0;
305*b2055c35SXin Li }
306*b2055c35SXin Li #else
307*b2055c35SXin Li worker->status_ = OK;
308*b2055c35SXin Li #endif
309*b2055c35SXin Li } else if (worker->status_ > OK) {
310*b2055c35SXin Li ok = Sync(worker);
311*b2055c35SXin Li }
312*b2055c35SXin Li assert(!ok || (worker->status_ == OK));
313*b2055c35SXin Li return ok;
314*b2055c35SXin Li }
315*b2055c35SXin Li
Execute(WebPWorker * const worker)316*b2055c35SXin Li static void Execute(WebPWorker* const worker) {
317*b2055c35SXin Li if (worker->hook != NULL) {
318*b2055c35SXin Li worker->had_error |= !worker->hook(worker->data1, worker->data2);
319*b2055c35SXin Li }
320*b2055c35SXin Li }
321*b2055c35SXin Li
Launch(WebPWorker * const worker)322*b2055c35SXin Li static void Launch(WebPWorker* const worker) {
323*b2055c35SXin Li #ifdef WEBP_USE_THREAD
324*b2055c35SXin Li ChangeState(worker, WORK);
325*b2055c35SXin Li #else
326*b2055c35SXin Li Execute(worker);
327*b2055c35SXin Li #endif
328*b2055c35SXin Li }
329*b2055c35SXin Li
End(WebPWorker * const worker)330*b2055c35SXin Li static void End(WebPWorker* const worker) {
331*b2055c35SXin Li #ifdef WEBP_USE_THREAD
332*b2055c35SXin Li if (worker->impl_ != NULL) {
333*b2055c35SXin Li WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;
334*b2055c35SXin Li ChangeState(worker, NOT_OK);
335*b2055c35SXin Li pthread_join(impl->thread_, NULL);
336*b2055c35SXin Li pthread_mutex_destroy(&impl->mutex_);
337*b2055c35SXin Li pthread_cond_destroy(&impl->condition_);
338*b2055c35SXin Li WebPSafeFree(impl);
339*b2055c35SXin Li worker->impl_ = NULL;
340*b2055c35SXin Li }
341*b2055c35SXin Li #else
342*b2055c35SXin Li worker->status_ = NOT_OK;
343*b2055c35SXin Li assert(worker->impl_ == NULL);
344*b2055c35SXin Li #endif
345*b2055c35SXin Li assert(worker->status_ == NOT_OK);
346*b2055c35SXin Li }
347*b2055c35SXin Li
348*b2055c35SXin Li //------------------------------------------------------------------------------
349*b2055c35SXin Li
350*b2055c35SXin Li static WebPWorkerInterface g_worker_interface = {
351*b2055c35SXin Li Init, Reset, Sync, Launch, Execute, End
352*b2055c35SXin Li };
353*b2055c35SXin Li
WebPSetWorkerInterface(const WebPWorkerInterface * const winterface)354*b2055c35SXin Li int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) {
355*b2055c35SXin Li if (winterface == NULL ||
356*b2055c35SXin Li winterface->Init == NULL || winterface->Reset == NULL ||
357*b2055c35SXin Li winterface->Sync == NULL || winterface->Launch == NULL ||
358*b2055c35SXin Li winterface->Execute == NULL || winterface->End == NULL) {
359*b2055c35SXin Li return 0;
360*b2055c35SXin Li }
361*b2055c35SXin Li g_worker_interface = *winterface;
362*b2055c35SXin Li return 1;
363*b2055c35SXin Li }
364*b2055c35SXin Li
WebPGetWorkerInterface(void)365*b2055c35SXin Li const WebPWorkerInterface* WebPGetWorkerInterface(void) {
366*b2055c35SXin Li return &g_worker_interface;
367*b2055c35SXin Li }
368*b2055c35SXin Li
369*b2055c35SXin Li //------------------------------------------------------------------------------
370