xref: /aosp_15_r20/external/grpc-grpc/test/core/promise/pipe_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/pipe.h"
16 
17 #include <memory>
18 #include <tuple>
19 #include <utility>
20 
21 #include "absl/functional/function_ref.h"
22 #include "absl/status/status.h"
23 #include "gmock/gmock.h"
24 #include "gtest/gtest.h"
25 
26 #include <grpc/event_engine/memory_allocator.h>
27 #include <grpc/grpc.h>
28 
29 #include "src/core/lib/gprpp/crash.h"
30 #include "src/core/lib/gprpp/ref_counted_ptr.h"
31 #include "src/core/lib/promise/activity.h"
32 #include "src/core/lib/promise/join.h"
33 #include "src/core/lib/promise/map.h"
34 #include "src/core/lib/promise/seq.h"
35 #include "src/core/lib/resource_quota/memory_quota.h"
36 #include "src/core/lib/resource_quota/resource_quota.h"
37 #include "test/core/promise/test_wakeup_schedulers.h"
38 
39 using testing::MockFunction;
40 using testing::StrictMock;
41 
42 namespace grpc_core {
43 
44 class PipeTest : public ::testing::Test {
45  protected:
46   MemoryAllocator memory_allocator_ = MemoryAllocator(
47       ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
48 };
49 
TEST_F(PipeTest,CanSendAndReceive)50 TEST_F(PipeTest, CanSendAndReceive) {
51   StrictMock<MockFunction<void(absl::Status)>> on_done;
52   EXPECT_CALL(on_done, Call(absl::OkStatus()));
53   MakeActivity(
54       [] {
55         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
56         return Seq(
57             // Concurrently: send 42 into the pipe, and receive from the pipe.
58             Join(pipe->sender.Push(42),
59                  Map(pipe->receiver.Next(),
60                      [](NextResult<int> r) { return r.value(); })),
61             // Once complete, verify successful sending and the received value
62             // is 42.
63             [](std::tuple<bool, int> result) {
64               EXPECT_TRUE(std::get<0>(result));
65               EXPECT_EQ(42, std::get<1>(result));
66               return absl::OkStatus();
67             });
68       },
69       NoWakeupScheduler(),
70       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
71       MakeScopedArena(1024, &memory_allocator_));
72 }
73 
TEST_F(PipeTest,CanInterceptAndMapAtSender)74 TEST_F(PipeTest, CanInterceptAndMapAtSender) {
75   StrictMock<MockFunction<void(absl::Status)>> on_done;
76   EXPECT_CALL(on_done, Call(absl::OkStatus()));
77   MakeActivity(
78       [] {
79         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
80         pipe->sender.InterceptAndMap([](int value) { return value / 2; });
81         return Seq(
82             // Concurrently: send 42 into the pipe, and receive from the pipe.
83             Join(pipe->sender.Push(42),
84                  Map(pipe->receiver.Next(),
85                      [](NextResult<int> r) { return r.value(); })),
86             // Once complete, verify successful sending and the received value
87             // is 21.
88             [](std::tuple<bool, int> result) {
89               EXPECT_TRUE(std::get<0>(result));
90               EXPECT_EQ(21, std::get<1>(result));
91               return absl::OkStatus();
92             });
93       },
94       NoWakeupScheduler(),
95       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
96       MakeScopedArena(1024, &memory_allocator_));
97 }
98 
TEST_F(PipeTest,CanInterceptAndMapAtReceiver)99 TEST_F(PipeTest, CanInterceptAndMapAtReceiver) {
100   StrictMock<MockFunction<void(absl::Status)>> on_done;
101   EXPECT_CALL(on_done, Call(absl::OkStatus()));
102   MakeActivity(
103       [] {
104         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
105         pipe->receiver.InterceptAndMap([](int value) { return value / 2; });
106         return Seq(
107             // Concurrently: send 42 into the pipe, and receive from the pipe.
108             Join(pipe->sender.Push(42),
109                  Map(pipe->receiver.Next(),
110                      [](NextResult<int> r) { return r.value(); })),
111             // Once complete, verify successful sending and the received value
112             // is 21.
113             [](std::tuple<bool, int> result) {
114               EXPECT_TRUE(std::get<0>(result));
115               EXPECT_EQ(21, std::get<1>(result));
116               return absl::OkStatus();
117             });
118       },
119       NoWakeupScheduler(),
120       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
121       MakeScopedArena(1024, &memory_allocator_));
122 }
123 
TEST_F(PipeTest,InterceptionOrderingIsCorrect)124 TEST_F(PipeTest, InterceptionOrderingIsCorrect) {
125   StrictMock<MockFunction<void(absl::Status)>> on_done;
126   EXPECT_CALL(on_done, Call(absl::OkStatus()));
127   MakeActivity(
128       [] {
129         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<std::string>>();
130         auto appender = [](char c) {
131           return [c](std::string value) {
132             value += c;
133             return value;
134           };
135         };
136         // Interception get added outwardly from the center, and run from sender
137         // to receiver, so the following should result in append "abcd".
138         pipe->receiver.InterceptAndMap(appender('c'));
139         pipe->sender.InterceptAndMap(appender('b'));
140         pipe->receiver.InterceptAndMap(appender('d'));
141         pipe->sender.InterceptAndMap(appender('a'));
142         return Seq(
143             // Concurrently: send "" into the pipe, and receive from the pipe.
144             Join(pipe->sender.Push(""),
145                  Map(pipe->receiver.Next(),
146                      [](NextResult<std::string> r) { return r.value(); })),
147             // Once complete, verify successful sending and the received value
148             // is 21.
149             [](std::tuple<bool, std::string> result) {
150               EXPECT_TRUE(std::get<0>(result));
151               EXPECT_EQ("abcd", std::get<1>(result));
152               return absl::OkStatus();
153             });
154       },
155       NoWakeupScheduler(),
156       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
157       MakeScopedArena(1024, &memory_allocator_));
158 }
159 
TEST_F(PipeTest,CanReceiveAndSend)160 TEST_F(PipeTest, CanReceiveAndSend) {
161   StrictMock<MockFunction<void(absl::Status)>> on_done;
162   EXPECT_CALL(on_done, Call(absl::OkStatus()));
163   MakeActivity(
164       [] {
165         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
166         return Seq(
167             // Concurrently: receive from the pipe, and send 42 into the pipe.
168             Join(Map(pipe->receiver.Next(),
169                      [](NextResult<int> r) { return r.value(); }),
170                  pipe->sender.Push(42)),
171             // Once complete, verify the received value is 42 and successful
172             // sending.
173             [](std::tuple<int, bool> result) {
174               EXPECT_EQ(std::get<0>(result), 42);
175               EXPECT_TRUE(std::get<1>(result));
176               return absl::OkStatus();
177             });
178       },
179       NoWakeupScheduler(),
180       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
181       MakeScopedArena(1024, &memory_allocator_));
182 }
183 
TEST_F(PipeTest,CanSeeClosedOnSend)184 TEST_F(PipeTest, CanSeeClosedOnSend) {
185   StrictMock<MockFunction<void(absl::Status)>> on_done;
186   EXPECT_CALL(on_done, Call(absl::OkStatus()));
187   MakeActivity(
188       [] {
189         Pipe<int> pipe;
190         auto sender = std::move(pipe.sender);
191         auto receiver = std::make_shared<std::unique_ptr<PipeReceiver<int>>>(
192             std::make_unique<PipeReceiver<int>>(std::move(pipe.receiver)));
193         return Seq(
194             // Concurrently:
195             // - push 43 into the sender, which will stall because there is no
196             //   reader
197             // - and close the receiver, which will fail the pending send.
198             Join(sender.Push(43),
199                  [receiver] {
200                    receiver->reset();
201                    return absl::OkStatus();
202                  }),
203             // Verify both that the send failed and that we executed the close.
204             [](const std::tuple<bool, absl::Status>& result) {
205               EXPECT_EQ(result, std::make_tuple(false, absl::OkStatus()));
206               return absl::OkStatus();
207             });
208       },
209       NoWakeupScheduler(),
210       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
211       MakeScopedArena(1024, &memory_allocator_));
212 }
213 
TEST_F(PipeTest,CanSeeClosedOnReceive)214 TEST_F(PipeTest, CanSeeClosedOnReceive) {
215   StrictMock<MockFunction<void(absl::Status)>> on_done;
216   EXPECT_CALL(on_done, Call(absl::OkStatus()));
217   MakeActivity(
218       [] {
219         Pipe<int> pipe;
220         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
221             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
222         auto receiver = std::move(pipe.receiver);
223         return Seq(
224             // Concurrently:
225             // - wait for a received value (will stall forever since we push
226             //   nothing into the queue)
227             // - close the sender, which will signal the receiver to return an
228             //   end-of-stream.
229             Join(receiver.Next(),
230                  [sender] {
231                    sender->reset();
232                    return absl::OkStatus();
233                  }),
234             // Verify we received end-of-stream and closed the sender.
235             [](std::tuple<NextResult<int>, absl::Status> result) {
236               EXPECT_FALSE(std::get<0>(result).has_value());
237               EXPECT_EQ(std::get<1>(result), absl::OkStatus());
238               return absl::OkStatus();
239             });
240       },
241       NoWakeupScheduler(),
242       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
243       MakeScopedArena(1024, &memory_allocator_));
244 }
245 
TEST_F(PipeTest,CanCloseSend)246 TEST_F(PipeTest, CanCloseSend) {
247   StrictMock<MockFunction<void(absl::Status)>> on_done;
248   EXPECT_CALL(on_done, Call(absl::OkStatus()));
249   MakeActivity(
250       [] {
251         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
252         return Seq(
253             // Concurrently:
254             // - wait for a received value (will stall forever since we push
255             //   nothing into the queue)
256             // - close the sender, which will signal the receiver to return an
257             //   end-of-stream.
258             Join(pipe->receiver.Next(),
259                  [pipe]() mutable {
260                    pipe->sender.Close();
261                    return absl::OkStatus();
262                  }),
263             // Verify we received end-of-stream and closed the sender.
264             [](std::tuple<NextResult<int>, absl::Status> result) {
265               EXPECT_FALSE(std::get<0>(result).has_value());
266               EXPECT_FALSE(std::get<0>(result).cancelled());
267               EXPECT_EQ(std::get<1>(result), absl::OkStatus());
268               return absl::OkStatus();
269             });
270       },
271       NoWakeupScheduler(),
272       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
273       MakeScopedArena(1024, &memory_allocator_));
274 }
275 
TEST_F(PipeTest,CanCloseWithErrorSend)276 TEST_F(PipeTest, CanCloseWithErrorSend) {
277   StrictMock<MockFunction<void(absl::Status)>> on_done;
278   EXPECT_CALL(on_done, Call(absl::OkStatus()));
279   MakeActivity(
280       [] {
281         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
282         return Seq(
283             // Concurrently:
284             // - wait for a received value (will stall forever since we push
285             //   nothing into the queue)
286             // - close the sender, which will signal the receiver to return an
287             //   end-of-stream.
288             Join(pipe->receiver.Next(),
289                  [pipe]() mutable {
290                    pipe->sender.CloseWithError();
291                    return absl::OkStatus();
292                  }),
293             // Verify we received end-of-stream and closed the sender.
294             [](std::tuple<NextResult<int>, absl::Status> result) {
295               EXPECT_FALSE(std::get<0>(result).has_value());
296               EXPECT_TRUE(std::get<0>(result).cancelled());
297               EXPECT_EQ(std::get<1>(result), absl::OkStatus());
298               return absl::OkStatus();
299             });
300       },
301       NoWakeupScheduler(),
302       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
303       MakeScopedArena(1024, &memory_allocator_));
304 }
305 
TEST_F(PipeTest,CanCloseWithErrorRecv)306 TEST_F(PipeTest, CanCloseWithErrorRecv) {
307   StrictMock<MockFunction<void(absl::Status)>> on_done;
308   EXPECT_CALL(on_done, Call(absl::OkStatus()));
309   MakeActivity(
310       [] {
311         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
312         return Seq(
313             // Concurrently:
314             // - wait for a received value (will stall forever since we push
315             //   nothing into the queue)
316             // - close the sender, which will signal the receiver to return an
317             //   end-of-stream.
318             Join(pipe->receiver.Next(),
319                  [pipe]() mutable {
320                    pipe->receiver.CloseWithError();
321                    return absl::OkStatus();
322                  }),
323             // Verify we received end-of-stream and closed the sender.
324             [](std::tuple<NextResult<int>, absl::Status> result) {
325               EXPECT_FALSE(std::get<0>(result).has_value());
326               EXPECT_TRUE(std::get<0>(result).cancelled());
327               EXPECT_EQ(std::get<1>(result), absl::OkStatus());
328               return absl::OkStatus();
329             });
330       },
331       NoWakeupScheduler(),
332       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
333       MakeScopedArena(1024, &memory_allocator_));
334 }
335 
TEST_F(PipeTest,CanCloseSendWithInterceptor)336 TEST_F(PipeTest, CanCloseSendWithInterceptor) {
337   StrictMock<MockFunction<void(absl::Status)>> on_done;
338   EXPECT_CALL(on_done, Call(absl::OkStatus()));
339   MakeActivity(
340       [] {
341         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
342         pipe->sender.InterceptAndMap([](int value) { return value + 1; });
343         return Seq(
344             // Concurrently:
345             // - wait for a received value (will stall forever since we push
346             //   nothing into the queue)
347             // - close the sender, which will signal the receiver to return an
348             //   end-of-stream.
349             Join(pipe->receiver.Next(),
350                  [pipe]() mutable {
351                    pipe->sender.Close();
352                    return absl::OkStatus();
353                  }),
354             // Verify we received end-of-stream and closed the sender.
355             [](std::tuple<NextResult<int>, absl::Status> result) {
356               EXPECT_FALSE(std::get<0>(result).has_value());
357               EXPECT_FALSE(std::get<0>(result).cancelled());
358               EXPECT_EQ(std::get<1>(result), absl::OkStatus());
359               return absl::OkStatus();
360             });
361       },
362       NoWakeupScheduler(),
363       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
364       MakeScopedArena(1024, &memory_allocator_));
365 }
366 
TEST_F(PipeTest,CanCancelSendWithInterceptor)367 TEST_F(PipeTest, CanCancelSendWithInterceptor) {
368   StrictMock<MockFunction<void(absl::Status)>> on_done;
369   EXPECT_CALL(on_done, Call(absl::OkStatus()));
370   MakeActivity(
371       [] {
372         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
373         pipe->sender.InterceptAndMap([](int) { return absl::nullopt; });
374         return Seq(
375             // Concurrently:
376             // - wait for a received value (will stall forever since we push
377             //   nothing into the queue)
378             // - close the sender, which will signal the receiver to return an
379             //   end-of-stream.
380             Join(pipe->receiver.Next(), pipe->sender.Push(3)),
381             // Verify we received end-of-stream with cancellation and sent
382             // successfully.
383             [](std::tuple<NextResult<int>, bool> result) {
384               EXPECT_FALSE(std::get<0>(result).has_value());
385               EXPECT_TRUE(std::get<0>(result).cancelled());
386               EXPECT_FALSE(std::get<1>(result));
387               return absl::OkStatus();
388             });
389       },
390       NoWakeupScheduler(),
391       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
392       MakeScopedArena(1024, &memory_allocator_));
393 }
394 
TEST_F(PipeTest,CanFlowControlThroughManyStages)395 TEST_F(PipeTest, CanFlowControlThroughManyStages) {
396   StrictMock<MockFunction<void(absl::Status)>> on_done;
397   EXPECT_CALL(on_done, Call(absl::OkStatus()));
398   auto done = std::make_shared<bool>(false);
399   // Push a value through multiple pipes.
400   // Ensure that it's possible to do so and get flow control throughout the
401   // entire pipe: ie that the push down does not complete until the last pipe
402   // completes.
403   MakeActivity(
404       [done] {
405         auto* pipe1 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
406         auto* pipe2 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
407         auto* pipe3 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
408         auto* sender1 = &pipe1->sender;
409         auto* receiver1 = &pipe1->receiver;
410         auto* sender2 = &pipe2->sender;
411         auto* receiver2 = &pipe2->receiver;
412         auto* sender3 = &pipe3->sender;
413         auto* receiver3 = &pipe3->receiver;
414         return Seq(Join(Seq(sender1->Push(1),
415                             [done] {
416                               *done = true;
417                               return 1;
418                             }),
419                         Seq(receiver1->Next(),
420                             [sender2](NextResult<int> r) mutable {
421                               return sender2->Push(r.value());
422                             }),
423                         Seq(receiver2->Next(),
424                             [sender3](NextResult<int> r) mutable {
425                               return sender3->Push(r.value());
426                             }),
427                         Seq(receiver3->Next(),
428                             [done](NextResult<int> r) {
429                               EXPECT_EQ(r.value(), 1);
430                               EXPECT_FALSE(*done);
431                               return 2;
432                             })),
433                    [](std::tuple<int, bool, bool, int> result) {
434                      EXPECT_EQ(result, std::make_tuple(1, true, true, 2));
435                      return absl::OkStatus();
436                    });
437       },
438       NoWakeupScheduler(),
439       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
440       MakeScopedArena(1024, &memory_allocator_));
441   ASSERT_TRUE(*done);
442 }
443 
TEST_F(PipeTest,AwaitClosedWorks)444 TEST_F(PipeTest, AwaitClosedWorks) {
445   StrictMock<MockFunction<void(absl::Status)>> on_done;
446   EXPECT_CALL(on_done, Call(absl::OkStatus()));
447   MakeActivity(
448       [] {
449         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
450         pipe->sender.InterceptAndMap([](int value) { return value + 1; });
451         return Seq(
452             // Concurrently:
453             // - wait for closed on both ends
454             // - close the sender, which will signal the receiver to return an
455             //   end-of-stream.
456             Join(pipe->receiver.AwaitClosed(), pipe->sender.AwaitClosed(),
457                  [pipe]() mutable {
458                    pipe->sender.Close();
459                    return absl::OkStatus();
460                  }),
461             // Verify we received end-of-stream and closed the sender.
462             [](std::tuple<bool, bool, absl::Status> result) {
463               EXPECT_FALSE(std::get<0>(result));
464               EXPECT_FALSE(std::get<1>(result));
465               EXPECT_EQ(std::get<2>(result), absl::OkStatus());
466               return absl::OkStatus();
467             });
468       },
469       NoWakeupScheduler(),
470       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
471       MakeScopedArena(1024, &memory_allocator_));
472 }
473 
474 class FakeActivity final : public Activity {
475  public:
Orphan()476   void Orphan() override {}
ForceImmediateRepoll(WakeupMask)477   void ForceImmediateRepoll(WakeupMask) override {}
MakeOwningWaker()478   Waker MakeOwningWaker() override { Crash("Not implemented"); }
MakeNonOwningWaker()479   Waker MakeNonOwningWaker() override { Crash("Not implemented"); }
Run(absl::FunctionRef<void ()> f)480   void Run(absl::FunctionRef<void()> f) {
481     ScopedActivity activity(this);
482     f();
483   }
484 };
485 
TEST_F(PipeTest,PollAckWaitsForReadyClosed)486 TEST_F(PipeTest, PollAckWaitsForReadyClosed) {
487   FakeActivity().Run([]() {
488     pipe_detail::Center<int> c;
489     int i = 1;
490     EXPECT_EQ(c.Push(&i), Poll<bool>(true));
491     c.MarkClosed();
492     EXPECT_EQ(c.PollAck(), Poll<bool>(Pending{}));
493   });
494 }
495 
496 }  // namespace grpc_core
497 
main(int argc,char ** argv)498 int main(int argc, char** argv) {
499   ::testing::InitGoogleTest(&argc, argv);
500   grpc_init();
501   int r = RUN_ALL_TESTS();
502   grpc_shutdown();
503   return r;
504 }
505