1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/pipe.h"
16
17 #include <memory>
18 #include <tuple>
19 #include <utility>
20
21 #include "absl/functional/function_ref.h"
22 #include "absl/status/status.h"
23 #include "gmock/gmock.h"
24 #include "gtest/gtest.h"
25
26 #include <grpc/event_engine/memory_allocator.h>
27 #include <grpc/grpc.h>
28
29 #include "src/core/lib/gprpp/crash.h"
30 #include "src/core/lib/gprpp/ref_counted_ptr.h"
31 #include "src/core/lib/promise/activity.h"
32 #include "src/core/lib/promise/join.h"
33 #include "src/core/lib/promise/map.h"
34 #include "src/core/lib/promise/seq.h"
35 #include "src/core/lib/resource_quota/memory_quota.h"
36 #include "src/core/lib/resource_quota/resource_quota.h"
37 #include "test/core/promise/test_wakeup_schedulers.h"
38
39 using testing::MockFunction;
40 using testing::StrictMock;
41
42 namespace grpc_core {
43
44 class PipeTest : public ::testing::Test {
45 protected:
46 MemoryAllocator memory_allocator_ = MemoryAllocator(
47 ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
48 };
49
TEST_F(PipeTest,CanSendAndReceive)50 TEST_F(PipeTest, CanSendAndReceive) {
51 StrictMock<MockFunction<void(absl::Status)>> on_done;
52 EXPECT_CALL(on_done, Call(absl::OkStatus()));
53 MakeActivity(
54 [] {
55 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
56 return Seq(
57 // Concurrently: send 42 into the pipe, and receive from the pipe.
58 Join(pipe->sender.Push(42),
59 Map(pipe->receiver.Next(),
60 [](NextResult<int> r) { return r.value(); })),
61 // Once complete, verify successful sending and the received value
62 // is 42.
63 [](std::tuple<bool, int> result) {
64 EXPECT_TRUE(std::get<0>(result));
65 EXPECT_EQ(42, std::get<1>(result));
66 return absl::OkStatus();
67 });
68 },
69 NoWakeupScheduler(),
70 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
71 MakeScopedArena(1024, &memory_allocator_));
72 }
73
TEST_F(PipeTest,CanInterceptAndMapAtSender)74 TEST_F(PipeTest, CanInterceptAndMapAtSender) {
75 StrictMock<MockFunction<void(absl::Status)>> on_done;
76 EXPECT_CALL(on_done, Call(absl::OkStatus()));
77 MakeActivity(
78 [] {
79 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
80 pipe->sender.InterceptAndMap([](int value) { return value / 2; });
81 return Seq(
82 // Concurrently: send 42 into the pipe, and receive from the pipe.
83 Join(pipe->sender.Push(42),
84 Map(pipe->receiver.Next(),
85 [](NextResult<int> r) { return r.value(); })),
86 // Once complete, verify successful sending and the received value
87 // is 21.
88 [](std::tuple<bool, int> result) {
89 EXPECT_TRUE(std::get<0>(result));
90 EXPECT_EQ(21, std::get<1>(result));
91 return absl::OkStatus();
92 });
93 },
94 NoWakeupScheduler(),
95 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
96 MakeScopedArena(1024, &memory_allocator_));
97 }
98
TEST_F(PipeTest,CanInterceptAndMapAtReceiver)99 TEST_F(PipeTest, CanInterceptAndMapAtReceiver) {
100 StrictMock<MockFunction<void(absl::Status)>> on_done;
101 EXPECT_CALL(on_done, Call(absl::OkStatus()));
102 MakeActivity(
103 [] {
104 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
105 pipe->receiver.InterceptAndMap([](int value) { return value / 2; });
106 return Seq(
107 // Concurrently: send 42 into the pipe, and receive from the pipe.
108 Join(pipe->sender.Push(42),
109 Map(pipe->receiver.Next(),
110 [](NextResult<int> r) { return r.value(); })),
111 // Once complete, verify successful sending and the received value
112 // is 21.
113 [](std::tuple<bool, int> result) {
114 EXPECT_TRUE(std::get<0>(result));
115 EXPECT_EQ(21, std::get<1>(result));
116 return absl::OkStatus();
117 });
118 },
119 NoWakeupScheduler(),
120 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
121 MakeScopedArena(1024, &memory_allocator_));
122 }
123
TEST_F(PipeTest,InterceptionOrderingIsCorrect)124 TEST_F(PipeTest, InterceptionOrderingIsCorrect) {
125 StrictMock<MockFunction<void(absl::Status)>> on_done;
126 EXPECT_CALL(on_done, Call(absl::OkStatus()));
127 MakeActivity(
128 [] {
129 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<std::string>>();
130 auto appender = [](char c) {
131 return [c](std::string value) {
132 value += c;
133 return value;
134 };
135 };
136 // Interception get added outwardly from the center, and run from sender
137 // to receiver, so the following should result in append "abcd".
138 pipe->receiver.InterceptAndMap(appender('c'));
139 pipe->sender.InterceptAndMap(appender('b'));
140 pipe->receiver.InterceptAndMap(appender('d'));
141 pipe->sender.InterceptAndMap(appender('a'));
142 return Seq(
143 // Concurrently: send "" into the pipe, and receive from the pipe.
144 Join(pipe->sender.Push(""),
145 Map(pipe->receiver.Next(),
146 [](NextResult<std::string> r) { return r.value(); })),
147 // Once complete, verify successful sending and the received value
148 // is 21.
149 [](std::tuple<bool, std::string> result) {
150 EXPECT_TRUE(std::get<0>(result));
151 EXPECT_EQ("abcd", std::get<1>(result));
152 return absl::OkStatus();
153 });
154 },
155 NoWakeupScheduler(),
156 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
157 MakeScopedArena(1024, &memory_allocator_));
158 }
159
TEST_F(PipeTest,CanReceiveAndSend)160 TEST_F(PipeTest, CanReceiveAndSend) {
161 StrictMock<MockFunction<void(absl::Status)>> on_done;
162 EXPECT_CALL(on_done, Call(absl::OkStatus()));
163 MakeActivity(
164 [] {
165 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
166 return Seq(
167 // Concurrently: receive from the pipe, and send 42 into the pipe.
168 Join(Map(pipe->receiver.Next(),
169 [](NextResult<int> r) { return r.value(); }),
170 pipe->sender.Push(42)),
171 // Once complete, verify the received value is 42 and successful
172 // sending.
173 [](std::tuple<int, bool> result) {
174 EXPECT_EQ(std::get<0>(result), 42);
175 EXPECT_TRUE(std::get<1>(result));
176 return absl::OkStatus();
177 });
178 },
179 NoWakeupScheduler(),
180 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
181 MakeScopedArena(1024, &memory_allocator_));
182 }
183
TEST_F(PipeTest,CanSeeClosedOnSend)184 TEST_F(PipeTest, CanSeeClosedOnSend) {
185 StrictMock<MockFunction<void(absl::Status)>> on_done;
186 EXPECT_CALL(on_done, Call(absl::OkStatus()));
187 MakeActivity(
188 [] {
189 Pipe<int> pipe;
190 auto sender = std::move(pipe.sender);
191 auto receiver = std::make_shared<std::unique_ptr<PipeReceiver<int>>>(
192 std::make_unique<PipeReceiver<int>>(std::move(pipe.receiver)));
193 return Seq(
194 // Concurrently:
195 // - push 43 into the sender, which will stall because there is no
196 // reader
197 // - and close the receiver, which will fail the pending send.
198 Join(sender.Push(43),
199 [receiver] {
200 receiver->reset();
201 return absl::OkStatus();
202 }),
203 // Verify both that the send failed and that we executed the close.
204 [](const std::tuple<bool, absl::Status>& result) {
205 EXPECT_EQ(result, std::make_tuple(false, absl::OkStatus()));
206 return absl::OkStatus();
207 });
208 },
209 NoWakeupScheduler(),
210 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
211 MakeScopedArena(1024, &memory_allocator_));
212 }
213
TEST_F(PipeTest,CanSeeClosedOnReceive)214 TEST_F(PipeTest, CanSeeClosedOnReceive) {
215 StrictMock<MockFunction<void(absl::Status)>> on_done;
216 EXPECT_CALL(on_done, Call(absl::OkStatus()));
217 MakeActivity(
218 [] {
219 Pipe<int> pipe;
220 auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
221 std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
222 auto receiver = std::move(pipe.receiver);
223 return Seq(
224 // Concurrently:
225 // - wait for a received value (will stall forever since we push
226 // nothing into the queue)
227 // - close the sender, which will signal the receiver to return an
228 // end-of-stream.
229 Join(receiver.Next(),
230 [sender] {
231 sender->reset();
232 return absl::OkStatus();
233 }),
234 // Verify we received end-of-stream and closed the sender.
235 [](std::tuple<NextResult<int>, absl::Status> result) {
236 EXPECT_FALSE(std::get<0>(result).has_value());
237 EXPECT_EQ(std::get<1>(result), absl::OkStatus());
238 return absl::OkStatus();
239 });
240 },
241 NoWakeupScheduler(),
242 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
243 MakeScopedArena(1024, &memory_allocator_));
244 }
245
TEST_F(PipeTest,CanCloseSend)246 TEST_F(PipeTest, CanCloseSend) {
247 StrictMock<MockFunction<void(absl::Status)>> on_done;
248 EXPECT_CALL(on_done, Call(absl::OkStatus()));
249 MakeActivity(
250 [] {
251 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
252 return Seq(
253 // Concurrently:
254 // - wait for a received value (will stall forever since we push
255 // nothing into the queue)
256 // - close the sender, which will signal the receiver to return an
257 // end-of-stream.
258 Join(pipe->receiver.Next(),
259 [pipe]() mutable {
260 pipe->sender.Close();
261 return absl::OkStatus();
262 }),
263 // Verify we received end-of-stream and closed the sender.
264 [](std::tuple<NextResult<int>, absl::Status> result) {
265 EXPECT_FALSE(std::get<0>(result).has_value());
266 EXPECT_FALSE(std::get<0>(result).cancelled());
267 EXPECT_EQ(std::get<1>(result), absl::OkStatus());
268 return absl::OkStatus();
269 });
270 },
271 NoWakeupScheduler(),
272 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
273 MakeScopedArena(1024, &memory_allocator_));
274 }
275
TEST_F(PipeTest,CanCloseWithErrorSend)276 TEST_F(PipeTest, CanCloseWithErrorSend) {
277 StrictMock<MockFunction<void(absl::Status)>> on_done;
278 EXPECT_CALL(on_done, Call(absl::OkStatus()));
279 MakeActivity(
280 [] {
281 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
282 return Seq(
283 // Concurrently:
284 // - wait for a received value (will stall forever since we push
285 // nothing into the queue)
286 // - close the sender, which will signal the receiver to return an
287 // end-of-stream.
288 Join(pipe->receiver.Next(),
289 [pipe]() mutable {
290 pipe->sender.CloseWithError();
291 return absl::OkStatus();
292 }),
293 // Verify we received end-of-stream and closed the sender.
294 [](std::tuple<NextResult<int>, absl::Status> result) {
295 EXPECT_FALSE(std::get<0>(result).has_value());
296 EXPECT_TRUE(std::get<0>(result).cancelled());
297 EXPECT_EQ(std::get<1>(result), absl::OkStatus());
298 return absl::OkStatus();
299 });
300 },
301 NoWakeupScheduler(),
302 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
303 MakeScopedArena(1024, &memory_allocator_));
304 }
305
TEST_F(PipeTest,CanCloseWithErrorRecv)306 TEST_F(PipeTest, CanCloseWithErrorRecv) {
307 StrictMock<MockFunction<void(absl::Status)>> on_done;
308 EXPECT_CALL(on_done, Call(absl::OkStatus()));
309 MakeActivity(
310 [] {
311 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
312 return Seq(
313 // Concurrently:
314 // - wait for a received value (will stall forever since we push
315 // nothing into the queue)
316 // - close the sender, which will signal the receiver to return an
317 // end-of-stream.
318 Join(pipe->receiver.Next(),
319 [pipe]() mutable {
320 pipe->receiver.CloseWithError();
321 return absl::OkStatus();
322 }),
323 // Verify we received end-of-stream and closed the sender.
324 [](std::tuple<NextResult<int>, absl::Status> result) {
325 EXPECT_FALSE(std::get<0>(result).has_value());
326 EXPECT_TRUE(std::get<0>(result).cancelled());
327 EXPECT_EQ(std::get<1>(result), absl::OkStatus());
328 return absl::OkStatus();
329 });
330 },
331 NoWakeupScheduler(),
332 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
333 MakeScopedArena(1024, &memory_allocator_));
334 }
335
TEST_F(PipeTest,CanCloseSendWithInterceptor)336 TEST_F(PipeTest, CanCloseSendWithInterceptor) {
337 StrictMock<MockFunction<void(absl::Status)>> on_done;
338 EXPECT_CALL(on_done, Call(absl::OkStatus()));
339 MakeActivity(
340 [] {
341 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
342 pipe->sender.InterceptAndMap([](int value) { return value + 1; });
343 return Seq(
344 // Concurrently:
345 // - wait for a received value (will stall forever since we push
346 // nothing into the queue)
347 // - close the sender, which will signal the receiver to return an
348 // end-of-stream.
349 Join(pipe->receiver.Next(),
350 [pipe]() mutable {
351 pipe->sender.Close();
352 return absl::OkStatus();
353 }),
354 // Verify we received end-of-stream and closed the sender.
355 [](std::tuple<NextResult<int>, absl::Status> result) {
356 EXPECT_FALSE(std::get<0>(result).has_value());
357 EXPECT_FALSE(std::get<0>(result).cancelled());
358 EXPECT_EQ(std::get<1>(result), absl::OkStatus());
359 return absl::OkStatus();
360 });
361 },
362 NoWakeupScheduler(),
363 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
364 MakeScopedArena(1024, &memory_allocator_));
365 }
366
TEST_F(PipeTest,CanCancelSendWithInterceptor)367 TEST_F(PipeTest, CanCancelSendWithInterceptor) {
368 StrictMock<MockFunction<void(absl::Status)>> on_done;
369 EXPECT_CALL(on_done, Call(absl::OkStatus()));
370 MakeActivity(
371 [] {
372 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
373 pipe->sender.InterceptAndMap([](int) { return absl::nullopt; });
374 return Seq(
375 // Concurrently:
376 // - wait for a received value (will stall forever since we push
377 // nothing into the queue)
378 // - close the sender, which will signal the receiver to return an
379 // end-of-stream.
380 Join(pipe->receiver.Next(), pipe->sender.Push(3)),
381 // Verify we received end-of-stream with cancellation and sent
382 // successfully.
383 [](std::tuple<NextResult<int>, bool> result) {
384 EXPECT_FALSE(std::get<0>(result).has_value());
385 EXPECT_TRUE(std::get<0>(result).cancelled());
386 EXPECT_FALSE(std::get<1>(result));
387 return absl::OkStatus();
388 });
389 },
390 NoWakeupScheduler(),
391 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
392 MakeScopedArena(1024, &memory_allocator_));
393 }
394
TEST_F(PipeTest,CanFlowControlThroughManyStages)395 TEST_F(PipeTest, CanFlowControlThroughManyStages) {
396 StrictMock<MockFunction<void(absl::Status)>> on_done;
397 EXPECT_CALL(on_done, Call(absl::OkStatus()));
398 auto done = std::make_shared<bool>(false);
399 // Push a value through multiple pipes.
400 // Ensure that it's possible to do so and get flow control throughout the
401 // entire pipe: ie that the push down does not complete until the last pipe
402 // completes.
403 MakeActivity(
404 [done] {
405 auto* pipe1 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
406 auto* pipe2 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
407 auto* pipe3 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
408 auto* sender1 = &pipe1->sender;
409 auto* receiver1 = &pipe1->receiver;
410 auto* sender2 = &pipe2->sender;
411 auto* receiver2 = &pipe2->receiver;
412 auto* sender3 = &pipe3->sender;
413 auto* receiver3 = &pipe3->receiver;
414 return Seq(Join(Seq(sender1->Push(1),
415 [done] {
416 *done = true;
417 return 1;
418 }),
419 Seq(receiver1->Next(),
420 [sender2](NextResult<int> r) mutable {
421 return sender2->Push(r.value());
422 }),
423 Seq(receiver2->Next(),
424 [sender3](NextResult<int> r) mutable {
425 return sender3->Push(r.value());
426 }),
427 Seq(receiver3->Next(),
428 [done](NextResult<int> r) {
429 EXPECT_EQ(r.value(), 1);
430 EXPECT_FALSE(*done);
431 return 2;
432 })),
433 [](std::tuple<int, bool, bool, int> result) {
434 EXPECT_EQ(result, std::make_tuple(1, true, true, 2));
435 return absl::OkStatus();
436 });
437 },
438 NoWakeupScheduler(),
439 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
440 MakeScopedArena(1024, &memory_allocator_));
441 ASSERT_TRUE(*done);
442 }
443
TEST_F(PipeTest,AwaitClosedWorks)444 TEST_F(PipeTest, AwaitClosedWorks) {
445 StrictMock<MockFunction<void(absl::Status)>> on_done;
446 EXPECT_CALL(on_done, Call(absl::OkStatus()));
447 MakeActivity(
448 [] {
449 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
450 pipe->sender.InterceptAndMap([](int value) { return value + 1; });
451 return Seq(
452 // Concurrently:
453 // - wait for closed on both ends
454 // - close the sender, which will signal the receiver to return an
455 // end-of-stream.
456 Join(pipe->receiver.AwaitClosed(), pipe->sender.AwaitClosed(),
457 [pipe]() mutable {
458 pipe->sender.Close();
459 return absl::OkStatus();
460 }),
461 // Verify we received end-of-stream and closed the sender.
462 [](std::tuple<bool, bool, absl::Status> result) {
463 EXPECT_FALSE(std::get<0>(result));
464 EXPECT_FALSE(std::get<1>(result));
465 EXPECT_EQ(std::get<2>(result), absl::OkStatus());
466 return absl::OkStatus();
467 });
468 },
469 NoWakeupScheduler(),
470 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
471 MakeScopedArena(1024, &memory_allocator_));
472 }
473
474 class FakeActivity final : public Activity {
475 public:
Orphan()476 void Orphan() override {}
ForceImmediateRepoll(WakeupMask)477 void ForceImmediateRepoll(WakeupMask) override {}
MakeOwningWaker()478 Waker MakeOwningWaker() override { Crash("Not implemented"); }
MakeNonOwningWaker()479 Waker MakeNonOwningWaker() override { Crash("Not implemented"); }
Run(absl::FunctionRef<void ()> f)480 void Run(absl::FunctionRef<void()> f) {
481 ScopedActivity activity(this);
482 f();
483 }
484 };
485
TEST_F(PipeTest,PollAckWaitsForReadyClosed)486 TEST_F(PipeTest, PollAckWaitsForReadyClosed) {
487 FakeActivity().Run([]() {
488 pipe_detail::Center<int> c;
489 int i = 1;
490 EXPECT_EQ(c.Push(&i), Poll<bool>(true));
491 c.MarkClosed();
492 EXPECT_EQ(c.PollAck(), Poll<bool>(Pending{}));
493 });
494 }
495
496 } // namespace grpc_core
497
main(int argc,char ** argv)498 int main(int argc, char** argv) {
499 ::testing::InitGoogleTest(&argc, argv);
500 grpc_init();
501 int r = RUN_ALL_TESTS();
502 grpc_shutdown();
503 return r;
504 }
505