1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "absl/synchronization/internal/waiter.h"
16
17 #include "absl/base/config.h"
18
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <pthread.h>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26
27 #ifdef __linux__
28 #include <linux/futex.h>
29 #include <sys/syscall.h>
30 #endif
31
32 #ifdef ABSL_HAVE_SEMAPHORE_H
33 #include <semaphore.h>
34 #endif
35
36 #include <errno.h>
37 #include <stdio.h>
38 #include <time.h>
39
40 #include <atomic>
41 #include <cassert>
42 #include <cstdint>
43 #include <new>
44 #include <type_traits>
45
46 #include "absl/base/internal/raw_logging.h"
47 #include "absl/base/internal/thread_identity.h"
48 #include "absl/base/optimization.h"
49 #include "absl/synchronization/internal/kernel_timeout.h"
50
51
52 namespace absl {
53 ABSL_NAMESPACE_BEGIN
54 namespace synchronization_internal {
55
MaybeBecomeIdle()56 static void MaybeBecomeIdle() {
57 base_internal::ThreadIdentity *identity =
58 base_internal::CurrentThreadIdentityIfPresent();
59 assert(identity != nullptr);
60 const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
61 const int ticker = identity->ticker.load(std::memory_order_relaxed);
62 const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
63 if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
64 identity->is_idle.store(true, std::memory_order_relaxed);
65 }
66 }
67
68 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
69
Waiter()70 Waiter::Waiter() {
71 futex_.store(0, std::memory_order_relaxed);
72 }
73
Wait(KernelTimeout t)74 bool Waiter::Wait(KernelTimeout t) {
75 // Loop until we can atomically decrement futex from a positive
76 // value, waiting on a futex while we believe it is zero.
77 // Note that, since the thread ticker is just reset, we don't need to check
78 // whether the thread is idle on the very first pass of the loop.
79 bool first_pass = true;
80
81 while (true) {
82 int32_t x = futex_.load(std::memory_order_relaxed);
83 while (x != 0) {
84 if (!futex_.compare_exchange_weak(x, x - 1,
85 std::memory_order_acquire,
86 std::memory_order_relaxed)) {
87 continue; // Raced with someone, retry.
88 }
89 return true; // Consumed a wakeup, we are done.
90 }
91
92 if (!first_pass) MaybeBecomeIdle();
93 const int err = Futex::WaitUntil(&futex_, 0, t);
94 if (err != 0) {
95 if (err == -EINTR || err == -EWOULDBLOCK) {
96 // Do nothing, the loop will retry.
97 } else if (err == -ETIMEDOUT) {
98 return false;
99 } else {
100 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
101 }
102 }
103 first_pass = false;
104 }
105 }
106
Post()107 void Waiter::Post() {
108 if (futex_.fetch_add(1, std::memory_order_release) == 0) {
109 // We incremented from 0, need to wake a potential waiter.
110 Poke();
111 }
112 }
113
Poke()114 void Waiter::Poke() {
115 // Wake one thread waiting on the futex.
116 const int err = Futex::Wake(&futex_, 1);
117 if (ABSL_PREDICT_FALSE(err < 0)) {
118 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
119 }
120 }
121
122 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
123
124 class PthreadMutexHolder {
125 public:
PthreadMutexHolder(pthread_mutex_t * mu)126 explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
127 const int err = pthread_mutex_lock(mu_);
128 if (err != 0) {
129 ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
130 }
131 }
132
133 PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
134 PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
135
~PthreadMutexHolder()136 ~PthreadMutexHolder() {
137 const int err = pthread_mutex_unlock(mu_);
138 if (err != 0) {
139 ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
140 }
141 }
142
143 private:
144 pthread_mutex_t *mu_;
145 };
146
Waiter()147 Waiter::Waiter() {
148 const int err = pthread_mutex_init(&mu_, 0);
149 if (err != 0) {
150 ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
151 }
152
153 const int err2 = pthread_cond_init(&cv_, 0);
154 if (err2 != 0) {
155 ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
156 }
157
158 waiter_count_ = 0;
159 wakeup_count_ = 0;
160 }
161
Wait(KernelTimeout t)162 bool Waiter::Wait(KernelTimeout t) {
163 struct timespec abs_timeout;
164 if (t.has_timeout()) {
165 abs_timeout = t.MakeAbsTimespec();
166 }
167
168 PthreadMutexHolder h(&mu_);
169 ++waiter_count_;
170 // Loop until we find a wakeup to consume or timeout.
171 // Note that, since the thread ticker is just reset, we don't need to check
172 // whether the thread is idle on the very first pass of the loop.
173 bool first_pass = true;
174 while (wakeup_count_ == 0) {
175 if (!first_pass) MaybeBecomeIdle();
176 // No wakeups available, time to wait.
177 if (!t.has_timeout()) {
178 const int err = pthread_cond_wait(&cv_, &mu_);
179 if (err != 0) {
180 ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
181 }
182 } else {
183 const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
184 if (err == ETIMEDOUT) {
185 --waiter_count_;
186 return false;
187 }
188 if (err != 0) {
189 ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
190 }
191 }
192 first_pass = false;
193 }
194 // Consume a wakeup and we're done.
195 --wakeup_count_;
196 --waiter_count_;
197 return true;
198 }
199
Post()200 void Waiter::Post() {
201 PthreadMutexHolder h(&mu_);
202 ++wakeup_count_;
203 InternalCondVarPoke();
204 }
205
Poke()206 void Waiter::Poke() {
207 PthreadMutexHolder h(&mu_);
208 InternalCondVarPoke();
209 }
210
InternalCondVarPoke()211 void Waiter::InternalCondVarPoke() {
212 if (waiter_count_ != 0) {
213 const int err = pthread_cond_signal(&cv_);
214 if (ABSL_PREDICT_FALSE(err != 0)) {
215 ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
216 }
217 }
218 }
219
220 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
221
Waiter()222 Waiter::Waiter() {
223 if (sem_init(&sem_, 0, 0) != 0) {
224 ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
225 }
226 wakeups_.store(0, std::memory_order_relaxed);
227 }
228
Wait(KernelTimeout t)229 bool Waiter::Wait(KernelTimeout t) {
230 struct timespec abs_timeout;
231 if (t.has_timeout()) {
232 abs_timeout = t.MakeAbsTimespec();
233 }
234
235 // Loop until we timeout or consume a wakeup.
236 // Note that, since the thread ticker is just reset, we don't need to check
237 // whether the thread is idle on the very first pass of the loop.
238 bool first_pass = true;
239 while (true) {
240 int x = wakeups_.load(std::memory_order_relaxed);
241 while (x != 0) {
242 if (!wakeups_.compare_exchange_weak(x, x - 1,
243 std::memory_order_acquire,
244 std::memory_order_relaxed)) {
245 continue; // Raced with someone, retry.
246 }
247 // Successfully consumed a wakeup, we're done.
248 return true;
249 }
250
251 if (!first_pass) MaybeBecomeIdle();
252 // Nothing to consume, wait (looping on EINTR).
253 while (true) {
254 if (!t.has_timeout()) {
255 if (sem_wait(&sem_) == 0) break;
256 if (errno == EINTR) continue;
257 ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
258 } else {
259 if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
260 if (errno == EINTR) continue;
261 if (errno == ETIMEDOUT) return false;
262 ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
263 }
264 }
265 first_pass = false;
266 }
267 }
268
Post()269 void Waiter::Post() {
270 // Post a wakeup.
271 if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
272 // We incremented from 0, need to wake a potential waiter.
273 Poke();
274 }
275 }
276
Poke()277 void Waiter::Poke() {
278 if (sem_post(&sem_) != 0) { // Wake any semaphore waiter.
279 ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
280 }
281 }
282
283 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
284
285 class Waiter::WinHelper {
286 public:
GetLock(Waiter * w)287 static SRWLOCK *GetLock(Waiter *w) {
288 return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
289 }
290
GetCond(Waiter * w)291 static CONDITION_VARIABLE *GetCond(Waiter *w) {
292 return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
293 }
294
295 static_assert(sizeof(SRWLOCK) == sizeof(void *),
296 "`mu_storage_` does not have the same size as SRWLOCK");
297 static_assert(alignof(SRWLOCK) == alignof(void *),
298 "`mu_storage_` does not have the same alignment as SRWLOCK");
299
300 static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
301 "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
302 "as `CONDITION_VARIABLE`");
303 static_assert(
304 alignof(CONDITION_VARIABLE) == alignof(void *),
305 "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
306
307 // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
308 // and destructible because we never call their constructors or destructors.
309 static_assert(std::is_trivially_constructible<SRWLOCK>::value,
310 "The `SRWLOCK` type must be trivially constructible");
311 static_assert(
312 std::is_trivially_constructible<CONDITION_VARIABLE>::value,
313 "The `CONDITION_VARIABLE` type must be trivially constructible");
314 static_assert(std::is_trivially_destructible<SRWLOCK>::value,
315 "The `SRWLOCK` type must be trivially destructible");
316 static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
317 "The `CONDITION_VARIABLE` type must be trivially destructible");
318 };
319
320 class LockHolder {
321 public:
LockHolder(SRWLOCK * mu)322 explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
323 AcquireSRWLockExclusive(mu_);
324 }
325
326 LockHolder(const LockHolder&) = delete;
327 LockHolder& operator=(const LockHolder&) = delete;
328
~LockHolder()329 ~LockHolder() {
330 ReleaseSRWLockExclusive(mu_);
331 }
332
333 private:
334 SRWLOCK* mu_;
335 };
336
Waiter()337 Waiter::Waiter() {
338 auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
339 auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
340 InitializeSRWLock(mu);
341 InitializeConditionVariable(cv);
342 waiter_count_ = 0;
343 wakeup_count_ = 0;
344 }
345
Wait(KernelTimeout t)346 bool Waiter::Wait(KernelTimeout t) {
347 SRWLOCK *mu = WinHelper::GetLock(this);
348 CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
349
350 LockHolder h(mu);
351 ++waiter_count_;
352
353 // Loop until we find a wakeup to consume or timeout.
354 // Note that, since the thread ticker is just reset, we don't need to check
355 // whether the thread is idle on the very first pass of the loop.
356 bool first_pass = true;
357 while (wakeup_count_ == 0) {
358 if (!first_pass) MaybeBecomeIdle();
359 // No wakeups available, time to wait.
360 if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
361 // GetLastError() returns a Win32 DWORD, but we assign to
362 // unsigned long to simplify the ABSL_RAW_LOG case below. The uniform
363 // initialization guarantees this is not a narrowing conversion.
364 const unsigned long err{GetLastError()}; // NOLINT(runtime/int)
365 if (err == ERROR_TIMEOUT) {
366 --waiter_count_;
367 return false;
368 } else {
369 ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
370 }
371 }
372 first_pass = false;
373 }
374 // Consume a wakeup and we're done.
375 --wakeup_count_;
376 --waiter_count_;
377 return true;
378 }
379
Post()380 void Waiter::Post() {
381 LockHolder h(WinHelper::GetLock(this));
382 ++wakeup_count_;
383 InternalCondVarPoke();
384 }
385
Poke()386 void Waiter::Poke() {
387 LockHolder h(WinHelper::GetLock(this));
388 InternalCondVarPoke();
389 }
390
InternalCondVarPoke()391 void Waiter::InternalCondVarPoke() {
392 if (waiter_count_ != 0) {
393 WakeConditionVariable(WinHelper::GetCond(this));
394 }
395 }
396
397 #else
398 #error Unknown ABSL_WAITER_MODE
399 #endif
400
401 } // namespace synchronization_internal
402 ABSL_NAMESPACE_END
403 } // namespace absl
404