xref: /aosp_15_r20/external/pigweed/pw_sync/condition_variable_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_sync/condition_variable.h"
16 
17 #include <chrono>
18 #include <functional>
19 
20 #include "pw_containers/vector.h"
21 #include "pw_sync/mutex.h"
22 #include "pw_sync/timed_thread_notification.h"
23 #include "pw_thread/non_portable_test_thread_options.h"
24 #include "pw_thread/sleep.h"
25 #include "pw_thread/thread.h"
26 #include "pw_unit_test/framework.h"
27 
28 namespace pw::sync {
29 namespace {
30 
31 using namespace std::chrono_literals;
32 
33 // A timeout for tests where successful behaviour involves waiting.
34 constexpr auto kRequiredTimeout = 100ms;
35 
36 // Maximum extra wait time allowed for test that ensure something waits for
37 // `kRequiredTimeout`.
38 const auto kAllowedSlack = kRequiredTimeout * 1.5;
39 
40 // A timeout that should only be hit if something goes wrong.
41 constexpr auto kFailureTimeout = 5s;
42 
43 using StateLock = std::unique_lock<Mutex>;
44 
45 struct ThreadInfo {
ThreadInfopw::sync::__anon4908bbcc0111::ThreadInfo46   explicit ThreadInfo(int id) : thread_id(id) {}
47 
48   // waiting_notifier is signalled in predicates to indicate that the predicate
49   // has been evaluated. This guarantees (via insider information) that the
50   // thread will acquire the internal ThreadNotification.
51   TimedThreadNotification waiting_notifier;
52 
53   // Signals when the worker thread is done.
54   TimedThreadNotification done_notifier;
55 
56   // The result of the predicate the worker thread uses with wait*(). Set from
57   // the main test thread and read by the worker thread.
58   bool predicate_result = false;
59 
60   // Stores the result of ConditionVariable::wait_for() or ::wait_until() for
61   // use in test asserts.
62   bool wait_result = false;
63 
64   // For use in recording the order in which threads block on a condition.
65   const int thread_id;
66 
67   // Returns a function which will return the current value of
68   //`predicate_result` and release `waiting_notifier`.
Predicatepw::sync::__anon4908bbcc0111::ThreadInfo69   std::function<bool()> Predicate() {
70     return [this]() {
71       bool result = this->predicate_result;
72       this->waiting_notifier.release();
73       return result;
74     };
75   }
76 };
77 
78 // A `ThreadCore` implementation that delegates to an `std::function`.
79 class LambdaThreadCore : public pw::thread::ThreadCore {
80  public:
LambdaThreadCore(std::function<void ()> work)81   explicit LambdaThreadCore(std::function<void()> work)
82       : work_(std::move(work)) {}
83 
84  private:
Run()85   void Run() override { work_(); }
86 
87   std::function<void()> work_;
88 };
89 
90 class LambdaThread {
91  public:
92   // Starts a new thread which runs `work`, joining the thread on destruction.
LambdaThread(std::function<void ()> work,pw::thread::Options options=pw::thread::test::TestOptionsThread0 ())93   explicit LambdaThread(
94       std::function<void()> work,
95       // TODO: b/290860904 - Replace TestOptionsThread0 with TestThreadContext.
96       pw::thread::Options options = pw::thread::test::TestOptionsThread0())
97       : thread_core_(std::move(work)), thread_(options, thread_core_) {}
~LambdaThread()98   ~LambdaThread() { thread_.join(); }
99   LambdaThread(const LambdaThread&) = delete;
100   LambdaThread(LambdaThread&&) = delete;
101   LambdaThread& operator=(const LambdaThread&) = delete;
102   LambdaThread&& operator=(LambdaThread&&) = delete;
103 
104  private:
105   LambdaThreadCore thread_core_;
106   pw::Thread thread_;
107 };
108 
TEST(Wait,PredicateTrueNoWait)109 TEST(Wait, PredicateTrueNoWait) {
110   Mutex mutex;
111   ConditionVariable condvar;
112   ThreadInfo thread_info(0);
113 
114   LambdaThread thread([&mutex, &condvar, &info = thread_info] {
115     StateLock l{mutex};
116     condvar.wait(l, [] { return true; });
117 
118     info.done_notifier.release();
119   });
120   EXPECT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
121 }
122 
TEST(NotifyOne,BlocksUntilSignaled)123 TEST(NotifyOne, BlocksUntilSignaled) {
124   Mutex mutex;
125   ConditionVariable condvar;
126   ThreadInfo thread_info(0);
127 
128   LambdaThread thread([&mutex, &condvar, &info = thread_info] {
129     StateLock l{mutex};
130     condvar.wait(l, info.Predicate());
131     info.done_notifier.release();
132   });
133   ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
134   {
135     StateLock l{mutex};
136     thread_info.predicate_result = true;
137   }
138   condvar.notify_one();
139   ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
140 }
141 
TEST(NotifyOne,UnblocksOne)142 TEST(NotifyOne, UnblocksOne) {
143   Mutex mutex;
144   ConditionVariable condvar;
145   std::array<ThreadInfo, 2> thread_info = {ThreadInfo(0), ThreadInfo(1)};
146   pw::Vector<int, 2> wait_order;
147 
148   LambdaThread thread_1(
149       [&mutex, &condvar, &info = thread_info[0], &wait_order] {
150         StateLock l{mutex};
151         auto predicate = [&info, &wait_order] {
152           wait_order.push_back(info.thread_id);
153           auto result = info.predicate_result;
154           info.waiting_notifier.release();
155           return result;
156         };
157         condvar.wait(l, predicate);
158         info.done_notifier.release();
159       },
160       pw::thread::test::TestOptionsThread0());
161   LambdaThread thread_2(
162       [&mutex, &condvar, &info = thread_info[1], &wait_order] {
163         StateLock l{mutex};
164         auto predicate = [&info, &wait_order] {
165           wait_order.push_back(info.thread_id);
166           auto result = info.predicate_result;
167           info.waiting_notifier.release();
168           return result;
169         };
170         condvar.wait(l, predicate);
171         info.done_notifier.release();
172       },
173       pw::thread::test::TestOptionsThread1());
174 
175   ASSERT_TRUE(thread_info[0].waiting_notifier.try_acquire_for(kFailureTimeout));
176   ASSERT_TRUE(thread_info[1].waiting_notifier.try_acquire_for(kFailureTimeout));
177 
178   {
179     StateLock l{mutex};
180     thread_info[1].predicate_result = true;
181     thread_info[0].predicate_result = true;
182   }
183   condvar.notify_one();
184   ASSERT_TRUE(thread_info[wait_order[0]].done_notifier.try_acquire_for(
185       kFailureTimeout));
186   ASSERT_FALSE(thread_info[wait_order[0]].done_notifier.try_acquire());
187   condvar.notify_one();
188   ASSERT_TRUE(thread_info[wait_order[1]].done_notifier.try_acquire_for(
189       kFailureTimeout));
190 }
191 
TEST(NotifyAll,UnblocksMultiple)192 TEST(NotifyAll, UnblocksMultiple) {
193   Mutex mutex;
194   ConditionVariable condvar;
195   std::array<ThreadInfo, 2> thread_info = {ThreadInfo(0), ThreadInfo(1)};
196 
197   LambdaThread thread_1(
198       [&mutex, &condvar, &info = thread_info[0]] {
199         StateLock l{mutex};
200         condvar.wait(l, info.Predicate());
201         info.done_notifier.release();
202       },
203       pw::thread::test::TestOptionsThread0());
204   LambdaThread thread_2(
205       [&mutex, &condvar, &info = thread_info[1]] {
206         StateLock l{mutex};
207         condvar.wait(l, info.Predicate());
208         info.done_notifier.release();
209       },
210       pw::thread::test::TestOptionsThread1());
211 
212   ASSERT_TRUE(thread_info[0].waiting_notifier.try_acquire_for(kFailureTimeout));
213   ASSERT_TRUE(thread_info[1].waiting_notifier.try_acquire_for(kFailureTimeout));
214   {
215     StateLock l{mutex};
216     thread_info[0].predicate_result = true;
217     thread_info[1].predicate_result = true;
218   }
219   condvar.notify_all();
220   ASSERT_TRUE(thread_info[0].done_notifier.try_acquire_for(kFailureTimeout));
221   ASSERT_TRUE(thread_info[1].done_notifier.try_acquire_for(kFailureTimeout));
222 }
223 
TEST(WaitFor,ReturnsTrueIfSignalled)224 TEST(WaitFor, ReturnsTrueIfSignalled) {
225   Mutex mutex;
226   ConditionVariable condvar;
227   ThreadInfo thread_info(0);
228 
229   LambdaThread thread([&mutex, &condvar, &info = thread_info] {
230     StateLock l{mutex};
231     info.wait_result = condvar.wait_for(l, kFailureTimeout, info.Predicate());
232     info.done_notifier.release();
233   });
234 
235   ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
236   {
237     StateLock l{mutex};
238     thread_info.predicate_result = true;
239   }
240   condvar.notify_one();
241   ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
242   ASSERT_TRUE(thread_info.wait_result);
243 }
244 
TEST(WaitFor,ReturnsFalseIfTimesOut)245 TEST(WaitFor, ReturnsFalseIfTimesOut) {
246   Mutex mutex;
247   ConditionVariable condvar;
248   ThreadInfo thread_info(0);
249 
250   LambdaThread thread([&mutex, &condvar, &info = thread_info] {
251     StateLock l{mutex};
252     info.wait_result = condvar.wait_for(l, 0ms, info.Predicate());
253     info.done_notifier.release();
254   });
255 
256   ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
257   ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
258   ASSERT_FALSE(thread_info.wait_result);
259 }
260 
261 // NOTE: This test waits even in successful circumstances.
TEST(WaitFor,TimeoutApproximatelyCorrect)262 TEST(WaitFor, TimeoutApproximatelyCorrect) {
263   Mutex mutex;
264   ConditionVariable condvar;
265   ThreadInfo thread_info(0);
266   pw::chrono::SystemClock::duration wait_duration{};
267 
268   LambdaThread thread([&mutex, &condvar, &info = thread_info, &wait_duration] {
269     StateLock l{mutex};
270     auto start = pw::chrono::SystemClock::now();
271     info.wait_result = condvar.wait_for(l, kRequiredTimeout, info.Predicate());
272     wait_duration = pw::chrono::SystemClock::now() - start;
273     info.done_notifier.release();
274   });
275 
276   ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
277   // Wake up thread multiple times. Make sure the timeout is observed.
278   for (int i = 0; i < 5; ++i) {
279     condvar.notify_one();
280     pw::this_thread::sleep_for(kRequiredTimeout / 6);
281   }
282   ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
283   EXPECT_FALSE(thread_info.wait_result);
284   EXPECT_GE(wait_duration, kRequiredTimeout);
285   EXPECT_LT(wait_duration, (kRequiredTimeout + kAllowedSlack));
286 }
287 
TEST(WaitUntil,ReturnsTrueIfSignalled)288 TEST(WaitUntil, ReturnsTrueIfSignalled) {
289   Mutex mutex;
290   ConditionVariable condvar;
291   ThreadInfo thread_info(0);
292 
293   LambdaThread thread([&mutex, &condvar, &info = thread_info] {
294     StateLock l{mutex};
295     info.wait_result = condvar.wait_until(
296         l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate());
297     info.done_notifier.release();
298   });
299 
300   ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
301   {
302     StateLock l{mutex};
303     thread_info.predicate_result = true;
304   }
305   condvar.notify_one();
306   ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
307   ASSERT_TRUE(thread_info.wait_result);
308 }
309 
310 // NOTE: This test waits even in successful circumstances.
TEST(WaitUntil,ReturnsFalseIfTimesOut)311 TEST(WaitUntil, ReturnsFalseIfTimesOut) {
312   Mutex mutex;
313   ConditionVariable condvar;
314   ThreadInfo thread_info(0);
315 
316   LambdaThread thread([&mutex, &condvar, &info = thread_info] {
317     StateLock l{mutex};
318     info.wait_result = condvar.wait_until(
319         l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate());
320     info.done_notifier.release();
321   });
322 
323   ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
324   ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
325   ASSERT_FALSE(thread_info.wait_result);
326 }
327 
328 // NOTE: This test waits even in successful circumstances.
TEST(WaitUntil,TimeoutApproximatelyCorrect)329 TEST(WaitUntil, TimeoutApproximatelyCorrect) {
330   Mutex mutex;
331   ConditionVariable condvar;
332   ThreadInfo thread_info(0);
333   pw::chrono::SystemClock::duration wait_duration{};
334 
335   LambdaThread thread([&mutex, &condvar, &info = thread_info, &wait_duration] {
336     StateLock l{mutex};
337     auto start = pw::chrono::SystemClock::now();
338     info.wait_result = condvar.wait_until(
339         l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate());
340     wait_duration = pw::chrono::SystemClock::now() - start;
341     info.done_notifier.release();
342   });
343 
344   ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout));
345   // Wake up thread multiple times. Make sure the timeout is observed.
346   for (int i = 0; i < 5; ++i) {
347     condvar.notify_one();
348     pw::this_thread::sleep_for(kRequiredTimeout / 6);
349   }
350   ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout));
351   ASSERT_FALSE(thread_info.wait_result);
352   ASSERT_GE(wait_duration, kRequiredTimeout);
353   ASSERT_LE(wait_duration, kRequiredTimeout + kAllowedSlack);
354 }
355 
356 }  // namespace
357 }  // namespace pw::sync
358