1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/activity.h"
16
17 #include <stdlib.h>
18
19 #include <functional>
20 #include <tuple>
21 #include <variant>
22
23 #include "gmock/gmock.h"
24 #include "gtest/gtest.h"
25
26 #include "src/core/lib/promise/join.h"
27 #include "src/core/lib/promise/poll.h"
28 #include "src/core/lib/promise/promise.h"
29 #include "src/core/lib/promise/seq.h"
30 #include "src/core/lib/promise/wait_set.h"
31 #include "test/core/promise/test_wakeup_schedulers.h"
32
33 using testing::_;
34 using testing::Mock;
35 using testing::MockFunction;
36 using testing::SaveArg;
37 using testing::StrictMock;
38
39 namespace grpc_core {
40
41 // A simple Barrier type: stalls progress until it is 'cleared'.
42 class Barrier {
43 public:
44 struct Result {};
45
Wait()46 Promise<Result> Wait() {
47 return [this]() -> Poll<Result> {
48 MutexLock lock(&mu_);
49 if (cleared_) {
50 return Result{};
51 } else {
52 return wait_set_.AddPending(GetContext<Activity>()->MakeOwningWaker());
53 }
54 };
55 }
56
Clear()57 void Clear() {
58 mu_.Lock();
59 cleared_ = true;
60 auto wakeup = wait_set_.TakeWakeupSet();
61 mu_.Unlock();
62 wakeup.Wakeup();
63 }
64
65 private:
66 Mutex mu_;
67 WaitSet wait_set_ ABSL_GUARDED_BY(mu_);
68 bool cleared_ ABSL_GUARDED_BY(mu_) = false;
69 };
70
71 // A simple Barrier type: stalls progress until it is 'cleared'.
72 // This variant supports only a single waiter.
73 class SingleBarrier {
74 public:
75 struct Result {};
76
Wait()77 Promise<Result> Wait() {
78 return [this]() -> Poll<Result> {
79 MutexLock lock(&mu_);
80 if (cleared_) {
81 return Result{};
82 } else {
83 waker_ = GetContext<Activity>()->MakeOwningWaker();
84 return Pending();
85 }
86 };
87 }
88
Clear()89 void Clear() {
90 mu_.Lock();
91 cleared_ = true;
92 auto waker = std::move(waker_);
93 mu_.Unlock();
94 waker.Wakeup();
95 }
96
97 private:
98 Mutex mu_;
99 Waker waker_ ABSL_GUARDED_BY(mu_);
100 bool cleared_ ABSL_GUARDED_BY(mu_) = false;
101 };
102
TEST(ActivityTest,ImmediatelyCompleteWithSuccess)103 TEST(ActivityTest, ImmediatelyCompleteWithSuccess) {
104 StrictMock<MockFunction<void(absl::Status)>> on_done;
105 EXPECT_CALL(on_done, Call(absl::OkStatus()));
106 MakeActivity(
107 [] { return [] { return absl::OkStatus(); }; }, NoWakeupScheduler(),
108 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
109 }
110
TEST(ActivityTest,ImmediatelyCompleteWithFailure)111 TEST(ActivityTest, ImmediatelyCompleteWithFailure) {
112 StrictMock<MockFunction<void(absl::Status)>> on_done;
113 EXPECT_CALL(on_done, Call(absl::CancelledError()));
114 MakeActivity(
115 [] { return [] { return absl::CancelledError(); }; }, NoWakeupScheduler(),
116 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
117 }
118
TEST(ActivityTest,DropImmediately)119 TEST(ActivityTest, DropImmediately) {
120 StrictMock<MockFunction<void(absl::Status)>> on_done;
121 EXPECT_CALL(on_done, Call(absl::CancelledError()));
122 MakeActivity(
123 [] { return []() -> Poll<absl::Status> { return Pending(); }; },
124 NoWakeupScheduler(),
125 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
126 }
127
128 template <typename B>
129 class BarrierTest : public ::testing::Test {
130 public:
131 using Type = B;
132 };
133
134 using BarrierTestTypes = ::testing::Types<Barrier, SingleBarrier>;
135 TYPED_TEST_SUITE(BarrierTest, BarrierTestTypes);
136
TYPED_TEST(BarrierTest,Barrier)137 TYPED_TEST(BarrierTest, Barrier) {
138 typename TestFixture::Type b;
139 StrictMock<MockFunction<void(absl::Status)>> on_done;
140 auto activity = MakeActivity(
141 [&b] {
142 return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
143 return absl::OkStatus();
144 });
145 },
146 InlineWakeupScheduler(),
147 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
148 // Clearing the barrier should let the activity proceed to return a result.
149 EXPECT_CALL(on_done, Call(absl::OkStatus()));
150 b.Clear();
151 }
152
TYPED_TEST(BarrierTest,BarrierPing)153 TYPED_TEST(BarrierTest, BarrierPing) {
154 typename TestFixture::Type b1;
155 typename TestFixture::Type b2;
156 StrictMock<MockFunction<void(absl::Status)>> on_done1;
157 StrictMock<MockFunction<void(absl::Status)>> on_done2;
158 MockCallbackScheduler scheduler1;
159 MockCallbackScheduler scheduler2;
160 auto activity1 = MakeActivity(
161 [&b1, &b2] {
162 return Seq(b1.Wait(), [&b2](typename TestFixture::Type::Result) {
163 // Clear the barrier whilst executing an activity
164 b2.Clear();
165 return absl::OkStatus();
166 });
167 },
168 UseMockCallbackScheduler{&scheduler1},
169 [&on_done1](absl::Status status) { on_done1.Call(std::move(status)); });
170 auto activity2 = MakeActivity(
171 [&b2] {
172 return Seq(b2.Wait(), [](typename TestFixture::Type::Result) {
173 return absl::OkStatus();
174 });
175 },
176 UseMockCallbackScheduler{&scheduler2},
177 [&on_done2](absl::Status status) { on_done2.Call(std::move(status)); });
178 // Since barrier triggers inside activity1 promise, activity2 wakeup will be
179 // scheduled from a callback.
180 std::function<void()> cb1;
181 std::function<void()> cb2;
182 EXPECT_CALL(scheduler1, Schedule(_)).WillOnce(SaveArg<0>(&cb1));
183 b1.Clear();
184 Mock::VerifyAndClearExpectations(&scheduler1);
185 EXPECT_CALL(on_done1, Call(absl::OkStatus()));
186 EXPECT_CALL(scheduler2, Schedule(_)).WillOnce(SaveArg<0>(&cb2));
187 cb1();
188 Mock::VerifyAndClearExpectations(&on_done1);
189 EXPECT_CALL(on_done2, Call(absl::OkStatus()));
190 cb2();
191 }
192
TYPED_TEST(BarrierTest,WakeSelf)193 TYPED_TEST(BarrierTest, WakeSelf) {
194 typename TestFixture::Type b;
195 StrictMock<MockFunction<void(absl::Status)>> on_done;
196 EXPECT_CALL(on_done, Call(absl::OkStatus()));
197 MakeActivity(
198 [&b] {
199 return Seq(Join(b.Wait(),
200 [&b] {
201 b.Clear();
202 return 1;
203 }),
204 [](std::tuple<typename TestFixture::Type::Result, int>) {
205 return absl::OkStatus();
206 });
207 },
208 NoWakeupScheduler(),
209 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
210 }
211
TYPED_TEST(BarrierTest,WakeAfterDestruction)212 TYPED_TEST(BarrierTest, WakeAfterDestruction) {
213 typename TestFixture::Type b;
214 {
215 StrictMock<MockFunction<void(absl::Status)>> on_done;
216 EXPECT_CALL(on_done, Call(absl::CancelledError()));
217 MakeActivity(
218 [&b] {
219 return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
220 return absl::OkStatus();
221 });
222 },
223 InlineWakeupScheduler(),
224 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
225 }
226 b.Clear();
227 }
228
TEST(ActivityTest,ForceWakeup)229 TEST(ActivityTest, ForceWakeup) {
230 StrictMock<MockFunction<void(absl::Status)>> on_done;
231 int run_count = 0;
232 auto activity = MakeActivity(
233 [&run_count]() -> Poll<absl::Status> {
234 ++run_count;
235 switch (run_count) {
236 case 1:
237 return Pending{};
238 case 2:
239 return absl::OkStatus();
240 default:
241 abort();
242 }
243 },
244 InlineWakeupScheduler(),
245 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
246 EXPECT_CALL(on_done, Call(absl::OkStatus()));
247 activity->ForceWakeup();
248 }
249
250 struct TestContext {
251 bool* done;
252 };
253 template <>
254 struct ContextType<TestContext> {};
255
TEST(ActivityTest,WithContext)256 TEST(ActivityTest, WithContext) {
257 bool done = false;
258 StrictMock<MockFunction<void(absl::Status)>> on_done;
259 EXPECT_CALL(on_done, Call(absl::OkStatus()));
260 MakeActivity(
261 [] {
262 *GetContext<TestContext>()->done = true;
263 return Immediate(absl::OkStatus());
264 },
265 NoWakeupScheduler(),
266 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
267 TestContext{&done});
268 EXPECT_TRUE(done);
269 }
270
TEST(ActivityTest,CanCancelDuringExecution)271 TEST(ActivityTest, CanCancelDuringExecution) {
272 ActivityPtr activity;
273 StrictMock<MockFunction<void(absl::Status)>> on_done;
274 int run_count = 0;
275
276 activity = MakeActivity(
277 [&activity, &run_count]() -> Poll<absl::Status> {
278 ++run_count;
279 switch (run_count) {
280 case 1:
281 return Pending{};
282 case 2:
283 activity.reset();
284 return Pending{};
285 default:
286 abort();
287 }
288 },
289 InlineWakeupScheduler(),
290 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
291
292 EXPECT_CALL(on_done, Call(absl::CancelledError()));
293 activity->ForceWakeup();
294 }
295
TEST(ActivityTest,CanCancelDuringSuccessfulExecution)296 TEST(ActivityTest, CanCancelDuringSuccessfulExecution) {
297 ActivityPtr activity;
298 StrictMock<MockFunction<void(absl::Status)>> on_done;
299 int run_count = 0;
300
301 activity = MakeActivity(
302 [&activity, &run_count]() -> Poll<absl::Status> {
303 ++run_count;
304 switch (run_count) {
305 case 1:
306 return Pending{};
307 case 2:
308 activity.reset();
309 return absl::OkStatus();
310 default:
311 abort();
312 }
313 },
314 InlineWakeupScheduler(),
315 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
316
317 EXPECT_CALL(on_done, Call(absl::OkStatus()));
318 activity->ForceWakeup();
319 }
320
TEST(WakerTest,CanWakeupEmptyWaker)321 TEST(WakerTest, CanWakeupEmptyWaker) {
322 // Empty wakers should not do anything upon wakeup.
323 Waker().Wakeup();
324 }
325
326 } // namespace grpc_core
327
main(int argc,char ** argv)328 int main(int argc, char** argv) {
329 ::testing::InitGoogleTest(&argc, argv);
330 return RUN_ALL_TESTS();
331 }
332