xref: /aosp_15_r20/external/grpc-grpc/test/core/promise/for_each_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/for_each.h"
16 
17 #include <memory>
18 
19 #include "gmock/gmock.h"
20 #include "gtest/gtest.h"
21 
22 #include <grpc/event_engine/memory_allocator.h>
23 
24 #include "src/core/lib/gprpp/ref_counted_ptr.h"
25 #include "src/core/lib/promise/activity.h"
26 #include "src/core/lib/promise/inter_activity_pipe.h"
27 #include "src/core/lib/promise/join.h"
28 #include "src/core/lib/promise/map.h"
29 #include "src/core/lib/promise/pipe.h"
30 #include "src/core/lib/promise/seq.h"
31 #include "src/core/lib/promise/try_seq.h"
32 #include "src/core/lib/resource_quota/arena.h"
33 #include "src/core/lib/resource_quota/memory_quota.h"
34 #include "src/core/lib/resource_quota/resource_quota.h"
35 #include "test/core/promise/test_wakeup_schedulers.h"
36 
37 using testing::Mock;
38 using testing::MockFunction;
39 using testing::StrictMock;
40 
41 namespace grpc_core {
42 
43 class ForEachTest : public ::testing::Test {
44  protected:
45   MemoryAllocator memory_allocator_ = MemoryAllocator(
46       ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
47 };
48 
TEST_F(ForEachTest,SendThriceWithPipe)49 TEST_F(ForEachTest, SendThriceWithPipe) {
50   int num_received = 0;
51   StrictMock<MockFunction<void(absl::Status)>> on_done;
52   EXPECT_CALL(on_done, Call(absl::OkStatus()));
53   MakeActivity(
54       [&num_received] {
55         Pipe<int> pipe;
56         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
57             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
58         return Map(
59             Join(
60                 // Push 3 things into a pipe -- 1, 2, then 3 -- then close.
61                 Seq((*sender)->Push(1), [sender] { return (*sender)->Push(2); },
62                     [sender] { return (*sender)->Push(3); },
63                     [sender] {
64                       sender->reset();
65                       return absl::OkStatus();
66                     }),
67                 // Use a ForEach loop to read them out and verify all values are
68                 // seen.
69                 ForEach(std::move(pipe.receiver),
70                         [&num_received](int i) {
71                           num_received++;
72                           EXPECT_EQ(num_received, i);
73                           return absl::OkStatus();
74                         })),
75             JustElem<1>());
76       },
77       NoWakeupScheduler(),
78       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
79       MakeScopedArena(1024, &memory_allocator_));
80   Mock::VerifyAndClearExpectations(&on_done);
81   EXPECT_EQ(num_received, 3);
82 }
83 
TEST_F(ForEachTest,SendThriceWithInterActivityPipe)84 TEST_F(ForEachTest, SendThriceWithInterActivityPipe) {
85   int num_received = 0;
86   StrictMock<MockFunction<void(absl::Status)>> on_done_sender;
87   StrictMock<MockFunction<void(absl::Status)>> on_done_receiver;
88   EXPECT_CALL(on_done_sender, Call(absl::OkStatus()));
89   EXPECT_CALL(on_done_receiver, Call(absl::OkStatus()));
90   InterActivityPipe<int, 1> pipe;
91   auto send_activity = MakeActivity(
92       Seq(
93           // Push 3 things into a pipe -- 1, 2, then 3 -- then close.
94           pipe.sender.Push(1), [&pipe] { return pipe.sender.Push(2); },
95           [&pipe] { return pipe.sender.Push(3); },
96           [&pipe] {
97             auto x = std::move(pipe.sender);
98             return absl::OkStatus();
99           }),
100       InlineWakeupScheduler{}, [&on_done_sender](absl::Status status) {
101         on_done_sender.Call(std::move(status));
102       });
103   MakeActivity(
104       [&num_received, &pipe] {
105         // Use a ForEach loop to read them out and verify
106         // all values are seen.
107         return ForEach(std::move(pipe.receiver), [&num_received](int i) {
108           num_received++;
109           EXPECT_EQ(num_received, i);
110           return absl::OkStatus();
111         });
112       },
113       NoWakeupScheduler(),
114       [&on_done_receiver](absl::Status status) {
115         on_done_receiver.Call(std::move(status));
116       });
117   Mock::VerifyAndClearExpectations(&on_done_sender);
118   Mock::VerifyAndClearExpectations(&on_done_receiver);
119   EXPECT_EQ(num_received, 3);
120 }
121 
122 // Pollable type that stays movable until it's polled, then causes the test to
123 // fail if it's moved again.
124 // Promises have the property that they can be moved until polled, and this
125 // helps us check that the internals of ForEach respect this rule.
126 class MoveableUntilPolled {
127  public:
128   MoveableUntilPolled() = default;
129   MoveableUntilPolled(const MoveableUntilPolled&) = delete;
130   MoveableUntilPolled& operator=(const MoveableUntilPolled&) = delete;
MoveableUntilPolled(MoveableUntilPolled && other)131   MoveableUntilPolled(MoveableUntilPolled&& other) noexcept : polls_(0) {
132     EXPECT_EQ(other.polls_, 0);
133   }
operator =(MoveableUntilPolled && other)134   MoveableUntilPolled& operator=(MoveableUntilPolled&& other) noexcept {
135     EXPECT_EQ(other.polls_, 0);
136     polls_ = 0;
137     return *this;
138   }
139 
operator ()()140   Poll<absl::Status> operator()() {
141     GetContext<Activity>()->ForceImmediateRepoll();
142     ++polls_;
143     if (polls_ == 10) return absl::OkStatus();
144     return Pending();
145   }
146 
147  private:
148   int polls_ = 0;
149 };
150 
TEST_F(ForEachTest,NoMoveAfterPoll)151 TEST_F(ForEachTest, NoMoveAfterPoll) {
152   int num_received = 0;
153   StrictMock<MockFunction<void(absl::Status)>> on_done;
154   EXPECT_CALL(on_done, Call(absl::OkStatus()));
155   MakeActivity(
156       [&num_received] {
157         Pipe<int> pipe;
158         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
159             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
160         return Map(
161             Join(
162                 // Push one things into a pipe, then close.
163                 Seq((*sender)->Push(1),
164                     [sender] {
165                       sender->reset();
166                       return absl::OkStatus();
167                     }),
168                 // Use a ForEach loop to read them out and verify all
169                 // values are seen.
170                 // Inject a MoveableUntilPolled into the loop to ensure that
171                 // ForEach doesn't internally move a promise post-polling.
172                 ForEach(std::move(pipe.receiver),
173                         [&num_received](int i) {
174                           num_received++;
175                           EXPECT_EQ(num_received, i);
176                           return MoveableUntilPolled();
177                         })),
178             JustElem<1>());
179       },
180       NoWakeupScheduler(),
181       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
182       MakeScopedArena(1024, &memory_allocator_));
183   Mock::VerifyAndClearExpectations(&on_done);
184   EXPECT_EQ(num_received, 1);
185 }
186 
TEST_F(ForEachTest,NextResultHeldThroughCallback)187 TEST_F(ForEachTest, NextResultHeldThroughCallback) {
188   int num_received = 0;
189   StrictMock<MockFunction<void(absl::Status)>> on_done;
190   EXPECT_CALL(on_done, Call(absl::OkStatus()));
191   MakeActivity(
192       [&num_received] {
193         Pipe<int> pipe;
194         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
195             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
196         return Map(
197             Join(
198                 // Push one things into a pipe, then close.
199                 Seq((*sender)->Push(1),
200                     [sender] {
201                       sender->reset();
202                       return absl::OkStatus();
203                     }),
204                 // Use a ForEach loop to read them out and verify all
205                 // values are seen.
206                 ForEach(std::move(pipe.receiver),
207                         [&num_received, sender](int i) {
208                           // While we're processing a value NextResult
209                           // should be held disallowing new items to be
210                           // pushed.
211                           // We also should not have reached the
212                           // sender->reset() line above yet either, as
213                           // the Push() should block until this code
214                           // completes.
215                           EXPECT_TRUE((*sender)->Push(2)().pending());
216                           num_received++;
217                           EXPECT_EQ(num_received, i);
218                           return TrySeq(
219                               // has the side effect of stalling for some
220                               // iterations
221                               MoveableUntilPolled(), [sender] {
222                                 // Perform the same test verifying the same
223                                 // properties for NextResult holding: all should
224                                 // still be true.
225                                 EXPECT_TRUE((*sender)->Push(2)().pending());
226                                 return absl::OkStatus();
227                               });
228                         })),
229             JustElem<1>());
230       },
231       NoWakeupScheduler(),
232       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
233       MakeScopedArena(1024, &memory_allocator_));
234   Mock::VerifyAndClearExpectations(&on_done);
235   EXPECT_EQ(num_received, 1);
236 }
237 
238 }  // namespace grpc_core
239 
main(int argc,char ** argv)240 int main(int argc, char** argv) {
241   ::testing::InitGoogleTest(&argc, argv);
242   return RUN_ALL_TESTS();
243 }
244