xref: /aosp_15_r20/external/grpc-grpc/test/core/promise/map_pipe_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/map_pipe.h"
16 
17 #include <memory>
18 #include <utility>
19 
20 #include "gmock/gmock.h"
21 #include "gtest/gtest.h"
22 
23 #include <grpc/event_engine/memory_allocator.h>
24 
25 #include "src/core/lib/gprpp/ref_counted_ptr.h"
26 #include "src/core/lib/promise/activity.h"
27 #include "src/core/lib/promise/for_each.h"
28 #include "src/core/lib/promise/join.h"
29 #include "src/core/lib/promise/map.h"
30 #include "src/core/lib/promise/pipe.h"
31 #include "src/core/lib/promise/poll.h"
32 #include "src/core/lib/promise/seq.h"
33 #include "src/core/lib/resource_quota/arena.h"
34 #include "src/core/lib/resource_quota/memory_quota.h"
35 #include "src/core/lib/resource_quota/resource_quota.h"
36 #include "test/core/promise/test_wakeup_schedulers.h"
37 
38 using testing::Mock;
39 using testing::MockFunction;
40 using testing::StrictMock;
41 
42 namespace grpc_core {
43 
44 template <typename T>
45 class Delayed {
46  public:
Delayed(T x)47   explicit Delayed(T x) : x_(x) {}
48 
operator ()()49   Poll<T> operator()() {
50     GetContext<Activity>()->ForceImmediateRepoll();
51     ++polls_;
52     if (polls_ == 10) return std::move(x_);
53     return Pending();
54   }
55 
56  private:
57   int polls_ = 0;
58   T x_;
59 };
60 
61 class MapPipeTest : public ::testing::Test {
62  protected:
63   MemoryAllocator memory_allocator_ = MemoryAllocator(
64       ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
65 };
66 
TEST_F(MapPipeTest,SendThriceWithPipeInterceptingReceive)67 TEST_F(MapPipeTest, SendThriceWithPipeInterceptingReceive) {
68   int num_received = 0;
69   StrictMock<MockFunction<void(absl::Status)>> on_done;
70   EXPECT_CALL(on_done, Call(absl::OkStatus()));
71   MakeActivity(
72       [&num_received] {
73         Pipe<int> pipe;
74         auto filter =
75             PipeMapper<int>::Intercept(pipe.receiver).TakeAndRun([](int x) {
76               return Delayed<int>(x + 1);
77             });
78         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
79             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
80         return Map(
81             Join(
82                 std::move(filter),
83                 // Push 3 things into a pipe -- 0, 1, then 2 -- then close.
84                 Seq((*sender)->Push(0), [sender] { return (*sender)->Push(1); },
85                     [sender] { return (*sender)->Push(2); },
86                     [sender] {
87                       sender->reset();
88                       return absl::OkStatus();
89                     }),
90                 // Use a ForEach loop to read them out and verify all values are
91                 // seen (but with 1 added).
92                 ForEach(std::move(pipe.receiver),
93                         [&num_received](int i) {
94                           num_received++;
95                           EXPECT_EQ(num_received, i);
96                           return absl::OkStatus();
97                         })),
98             JustElem<2>());
99       },
100       NoWakeupScheduler(),
101       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
102       MakeScopedArena(1024, &memory_allocator_));
103   Mock::VerifyAndClearExpectations(&on_done);
104   EXPECT_EQ(num_received, 3);
105 }
106 
TEST_F(MapPipeTest,SendThriceWithPipeInterceptingSend)107 TEST_F(MapPipeTest, SendThriceWithPipeInterceptingSend) {
108   int num_received = 0;
109   StrictMock<MockFunction<void(absl::Status)>> on_done;
110   EXPECT_CALL(on_done, Call(absl::OkStatus()));
111   MakeActivity(
112       [&num_received] {
113         Pipe<int> pipe;
114         auto filter =
115             PipeMapper<int>::Intercept(pipe.sender).TakeAndRun([](int x) {
116               return Delayed<int>(x + 1);
117             });
118         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
119             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
120         return Map(
121             Join(
122                 std::move(filter),
123                 // Push 3 things into a pipe -- 0, 1, then 2 -- then close.
124                 Seq((*sender)->Push(0), [sender] { return (*sender)->Push(1); },
125                     [sender] { return (*sender)->Push(2); },
126                     [sender] {
127                       sender->reset();
128                       return absl::OkStatus();
129                     }),
130                 // Use a ForEach loop to read them out and verify all values are
131                 // seen (but with 1 added).
132                 ForEach(std::move(pipe.receiver),
133                         [&num_received](int i) {
134                           num_received++;
135                           EXPECT_EQ(num_received, i);
136                           return absl::OkStatus();
137                         })),
138             JustElem<2>());
139       },
140       NoWakeupScheduler(),
141       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
142       MakeScopedArena(1024, &memory_allocator_));
143   Mock::VerifyAndClearExpectations(&on_done);
144   EXPECT_EQ(num_received, 3);
145 }
146 
147 }  // namespace grpc_core
148 
main(int argc,char ** argv)149 int main(int argc, char** argv) {
150   ::testing::InitGoogleTest(&argc, argv);
151   return RUN_ALL_TESTS();
152 }
153