1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_sync/condition_variable.h"
16
17 #include <chrono>
18 #include <functional>
19
20 #include "pw_containers/vector.h"
21 #include "pw_sync/mutex.h"
22 #include "pw_sync/timed_thread_notification.h"
23 #include "pw_thread/non_portable_test_thread_options.h"
24 #include "pw_thread/sleep.h"
25 #include "pw_thread/thread.h"
26 #include "pw_unit_test/framework.h"
27
28 namespace pw::sync {
29 namespace {
30
31 using namespace std::chrono_literals;
32
33 // A timeout for tests where successful behaviour involves waiting.
34 constexpr auto kRequiredTimeout = 100ms;
35
36 // Maximum extra wait time allowed for test that ensure something waits for
37 // `kRequiredTimeout`.
38 const auto kAllowedSlack = kRequiredTimeout * 1.5;
39
40 // A timeout that should only be hit if something goes wrong.
41 constexpr auto kFailureTimeout = 5s;
42
43 using StateLock = std::unique_lock<Mutex>;
44
45 struct ThreadInfo {
ThreadInfopw::sync::__anon4908bbcc0111::ThreadInfo46 explicit ThreadInfo(int id) : thread_id(id) {}
47
48 // waiting_notifier is signalled in predicates to indicate that the predicate
49 // has been evaluated. This guarantees (via insider information) that the
50 // thread will acquire the internal ThreadNotification.
51 TimedThreadNotification waiting_notifier;
52
53 // Signals when the worker thread is done.
54 TimedThreadNotification done_notifier;
55
56 // The result of the predicate the worker thread uses with wait*(). Set from
57 // the main test thread and read by the worker thread.
58 bool predicate_result = false;
59
60 // Stores the result of ConditionVariable::wait_for() or ::wait_until() for
61 // use in test asserts.
62 bool wait_result = false;
63
64 // For use in recording the order in which threads block on a condition.
65 const int thread_id;
66
67 // Returns a function which will return the current value of
68 //`predicate_result` and release `waiting_notifier`.
Predicatepw::sync::__anon4908bbcc0111::ThreadInfo69 std::function<bool()> Predicate() {
70 return [this]() {
71 bool result = this->predicate_result;
72 this->waiting_notifier.release();
73 return result;
74 };
75 }
76 };
77
78 // A `ThreadCore` implementation that delegates to an `std::function`.
79 class LambdaThreadCore : public pw::thread::ThreadCore {
80 public:
LambdaThreadCore(std::function<void ()> work)81 explicit LambdaThreadCore(std::function<void()> work)
82 : work_(std::move(work)) {}
83
84 private:
Run()85 void Run() override { work_(); }
86
87 std::function<void()> work_;
88 };
89
90 class LambdaThread {
91 public:
92 // Starts a new thread which runs `work`, joining the thread on destruction.
LambdaThread(std::function<void ()> work,pw::thread::Options options=pw::thread::test::TestOptionsThread0 ())93 explicit LambdaThread(
94 std::function<void()> work,
95 // TODO: b/290860904 - Replace TestOptionsThread0 with TestThreadContext.
96 pw::thread::Options options = pw::thread::test::TestOptionsThread0())
97 : thread_core_(std::move(work)), thread_(options, thread_core_) {}
~LambdaThread()98 ~LambdaThread() { thread_.join(); }
99 LambdaThread(const LambdaThread&) = delete;
100 LambdaThread(LambdaThread&&) = delete;
101 LambdaThread& operator=(const LambdaThread&) = delete;
102 LambdaThread&& operator=(LambdaThread&&) = delete;
103
104 private:
105 LambdaThreadCore thread_core_;
106 pw::Thread thread_;
107 };
108
TEST(Wait,PredicateTrueNoWait)109 TEST(Wait, PredicateTrueNoWait) {
110 Mutex mutex;
111 ConditionVariable condvar;
112 ThreadInfo thread_info(0);
113
114 LambdaThread thread([&mutex, &condvar, &info = thread_info] {
115 StateLock l{mutex};
116 condvar.wait(l, [] { return true; });
117
118 info.done_notifier.release();
119 });
120 EXPECT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
121 }
122
TEST(NotifyOne,BlocksUntilSignaled)123 TEST(NotifyOne, BlocksUntilSignaled) {
124 Mutex mutex;
125 ConditionVariable condvar;
126 ThreadInfo thread_info(0);
127
128 LambdaThread thread([&mutex, &condvar, &info = thread_info] {
129 StateLock l{mutex};
130 condvar.wait(l, info.Predicate());
131 info.done_notifier.release();
132 });
133 ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
134 {
135 StateLock l{mutex};
136 thread_info.predicate_result = true;
137 }
138 condvar.notify_one();
139 ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
140 }
141
TEST(NotifyOne,UnblocksOne)142 TEST(NotifyOne, UnblocksOne) {
143 Mutex mutex;
144 ConditionVariable condvar;
145 std::array<ThreadInfo, 2> thread_info = {ThreadInfo(0), ThreadInfo(1)};
146 pw::Vector<int, 2> wait_order;
147
148 LambdaThread thread_1(
149 [&mutex, &condvar, &info = thread_info[0], &wait_order] {
150 StateLock l{mutex};
151 auto predicate = [&info, &wait_order] {
152 wait_order.push_back(info.thread_id);
153 auto result = info.predicate_result;
154 info.waiting_notifier.release();
155 return result;
156 };
157 condvar.wait(l, predicate);
158 info.done_notifier.release();
159 },
160 pw::thread::test::TestOptionsThread0());
161 LambdaThread thread_2(
162 [&mutex, &condvar, &info = thread_info[1], &wait_order] {
163 StateLock l{mutex};
164 auto predicate = [&info, &wait_order] {
165 wait_order.push_back(info.thread_id);
166 auto result = info.predicate_result;
167 info.waiting_notifier.release();
168 return result;
169 };
170 condvar.wait(l, predicate);
171 info.done_notifier.release();
172 },
173 pw::thread::test::TestOptionsThread1());
174
175 ASSERT_TRUE(thread_info[0].waiting_notifier.try_acquire_for(kFailureTimeout));
176 ASSERT_TRUE(thread_info[1].waiting_notifier.try_acquire_for(kFailureTimeout));
177
178 {
179 StateLock l{mutex};
180 thread_info[1].predicate_result = true;
181 thread_info[0].predicate_result = true;
182 }
183 condvar.notify_one();
184 ASSERT_TRUE(thread_info[wait_order[0]].done_notifier.try_acquire_for(
185 kFailureTimeout));
186 ASSERT_FALSE(thread_info[wait_order[0]].done_notifier.try_acquire());
187 condvar.notify_one();
188 ASSERT_TRUE(thread_info[wait_order[1]].done_notifier.try_acquire_for(
189 kFailureTimeout));
190 }
191
TEST(NotifyAll,UnblocksMultiple)192 TEST(NotifyAll, UnblocksMultiple) {
193 Mutex mutex;
194 ConditionVariable condvar;
195 std::array<ThreadInfo, 2> thread_info = {ThreadInfo(0), ThreadInfo(1)};
196
197 LambdaThread thread_1(
198 [&mutex, &condvar, &info = thread_info[0]] {
199 StateLock l{mutex};
200 condvar.wait(l, info.Predicate());
201 info.done_notifier.release();
202 },
203 pw::thread::test::TestOptionsThread0());
204 LambdaThread thread_2(
205 [&mutex, &condvar, &info = thread_info[1]] {
206 StateLock l{mutex};
207 condvar.wait(l, info.Predicate());
208 info.done_notifier.release();
209 },
210 pw::thread::test::TestOptionsThread1());
211
212 ASSERT_TRUE(thread_info[0].waiting_notifier.try_acquire_for(kFailureTimeout));
213 ASSERT_TRUE(thread_info[1].waiting_notifier.try_acquire_for(kFailureTimeout));
214 {
215 StateLock l{mutex};
216 thread_info[0].predicate_result = true;
217 thread_info[1].predicate_result = true;
218 }
219 condvar.notify_all();
220 ASSERT_TRUE(thread_info[0].done_notifier.try_acquire_for(kFailureTimeout));
221 ASSERT_TRUE(thread_info[1].done_notifier.try_acquire_for(kFailureTimeout));
222 }
223
TEST(WaitFor,ReturnsTrueIfSignalled)224 TEST(WaitFor, ReturnsTrueIfSignalled) {
225 Mutex mutex;
226 ConditionVariable condvar;
227 ThreadInfo thread_info(0);
228
229 LambdaThread thread([&mutex, &condvar, &info = thread_info] {
230 StateLock l{mutex};
231 info.wait_result = condvar.wait_for(l, kFailureTimeout, info.Predicate());
232 info.done_notifier.release();
233 });
234
235 ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
236 {
237 StateLock l{mutex};
238 thread_info.predicate_result = true;
239 }
240 condvar.notify_one();
241 ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
242 ASSERT_TRUE(thread_info.wait_result);
243 }
244
TEST(WaitFor,ReturnsFalseIfTimesOut)245 TEST(WaitFor, ReturnsFalseIfTimesOut) {
246 Mutex mutex;
247 ConditionVariable condvar;
248 ThreadInfo thread_info(0);
249
250 LambdaThread thread([&mutex, &condvar, &info = thread_info] {
251 StateLock l{mutex};
252 info.wait_result = condvar.wait_for(l, 0ms, info.Predicate());
253 info.done_notifier.release();
254 });
255
256 ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
257 ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
258 ASSERT_FALSE(thread_info.wait_result);
259 }
260
261 // NOTE: This test waits even in successful circumstances.
TEST(WaitFor,TimeoutApproximatelyCorrect)262 TEST(WaitFor, TimeoutApproximatelyCorrect) {
263 Mutex mutex;
264 ConditionVariable condvar;
265 ThreadInfo thread_info(0);
266 pw::chrono::SystemClock::duration wait_duration{};
267
268 LambdaThread thread([&mutex, &condvar, &info = thread_info, &wait_duration] {
269 StateLock l{mutex};
270 auto start = pw::chrono::SystemClock::now();
271 info.wait_result = condvar.wait_for(l, kRequiredTimeout, info.Predicate());
272 wait_duration = pw::chrono::SystemClock::now() - start;
273 info.done_notifier.release();
274 });
275
276 ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
277 // Wake up thread multiple times. Make sure the timeout is observed.
278 for (int i = 0; i < 5; ++i) {
279 condvar.notify_one();
280 pw::this_thread::sleep_for(kRequiredTimeout / 6);
281 }
282 ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
283 EXPECT_FALSE(thread_info.wait_result);
284 EXPECT_GE(wait_duration, kRequiredTimeout);
285 EXPECT_LT(wait_duration, (kRequiredTimeout + kAllowedSlack));
286 }
287
TEST(WaitUntil,ReturnsTrueIfSignalled)288 TEST(WaitUntil, ReturnsTrueIfSignalled) {
289 Mutex mutex;
290 ConditionVariable condvar;
291 ThreadInfo thread_info(0);
292
293 LambdaThread thread([&mutex, &condvar, &info = thread_info] {
294 StateLock l{mutex};
295 info.wait_result = condvar.wait_until(
296 l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate());
297 info.done_notifier.release();
298 });
299
300 ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
301 {
302 StateLock l{mutex};
303 thread_info.predicate_result = true;
304 }
305 condvar.notify_one();
306 ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
307 ASSERT_TRUE(thread_info.wait_result);
308 }
309
310 // NOTE: This test waits even in successful circumstances.
TEST(WaitUntil,ReturnsFalseIfTimesOut)311 TEST(WaitUntil, ReturnsFalseIfTimesOut) {
312 Mutex mutex;
313 ConditionVariable condvar;
314 ThreadInfo thread_info(0);
315
316 LambdaThread thread([&mutex, &condvar, &info = thread_info] {
317 StateLock l{mutex};
318 info.wait_result = condvar.wait_until(
319 l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate());
320 info.done_notifier.release();
321 });
322
323 ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
324 ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
325 ASSERT_FALSE(thread_info.wait_result);
326 }
327
328 // NOTE: This test waits even in successful circumstances.
TEST(WaitUntil,TimeoutApproximatelyCorrect)329 TEST(WaitUntil, TimeoutApproximatelyCorrect) {
330 Mutex mutex;
331 ConditionVariable condvar;
332 ThreadInfo thread_info(0);
333 pw::chrono::SystemClock::duration wait_duration{};
334
335 LambdaThread thread([&mutex, &condvar, &info = thread_info, &wait_duration] {
336 StateLock l{mutex};
337 auto start = pw::chrono::SystemClock::now();
338 info.wait_result = condvar.wait_until(
339 l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate());
340 wait_duration = pw::chrono::SystemClock::now() - start;
341 info.done_notifier.release();
342 });
343
344 ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
345 // Wake up thread multiple times. Make sure the timeout is observed.
346 for (int i = 0; i < 5; ++i) {
347 condvar.notify_one();
348 pw::this_thread::sleep_for(kRequiredTimeout / 6);
349 }
350 ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
351 ASSERT_FALSE(thread_info.wait_result);
352 ASSERT_GE(wait_duration, kRequiredTimeout);
353 ASSERT_LE(wait_duration, kRequiredTimeout + kAllowedSlack);
354 }
355
356 } // namespace
357 } // namespace pw::sync
358