1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/for_each.h"
16
17 #include <memory>
18
19 #include "gmock/gmock.h"
20 #include "gtest/gtest.h"
21
22 #include <grpc/event_engine/memory_allocator.h>
23
24 #include "src/core/lib/gprpp/ref_counted_ptr.h"
25 #include "src/core/lib/promise/activity.h"
26 #include "src/core/lib/promise/inter_activity_pipe.h"
27 #include "src/core/lib/promise/join.h"
28 #include "src/core/lib/promise/map.h"
29 #include "src/core/lib/promise/pipe.h"
30 #include "src/core/lib/promise/seq.h"
31 #include "src/core/lib/promise/try_seq.h"
32 #include "src/core/lib/resource_quota/arena.h"
33 #include "src/core/lib/resource_quota/memory_quota.h"
34 #include "src/core/lib/resource_quota/resource_quota.h"
35 #include "test/core/promise/test_wakeup_schedulers.h"
36
37 using testing::Mock;
38 using testing::MockFunction;
39 using testing::StrictMock;
40
41 namespace grpc_core {
42
43 class ForEachTest : public ::testing::Test {
44 protected:
45 MemoryAllocator memory_allocator_ = MemoryAllocator(
46 ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
47 };
48
TEST_F(ForEachTest,SendThriceWithPipe)49 TEST_F(ForEachTest, SendThriceWithPipe) {
50 int num_received = 0;
51 StrictMock<MockFunction<void(absl::Status)>> on_done;
52 EXPECT_CALL(on_done, Call(absl::OkStatus()));
53 MakeActivity(
54 [&num_received] {
55 Pipe<int> pipe;
56 auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
57 std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
58 return Map(
59 Join(
60 // Push 3 things into a pipe -- 1, 2, then 3 -- then close.
61 Seq((*sender)->Push(1), [sender] { return (*sender)->Push(2); },
62 [sender] { return (*sender)->Push(3); },
63 [sender] {
64 sender->reset();
65 return absl::OkStatus();
66 }),
67 // Use a ForEach loop to read them out and verify all values are
68 // seen.
69 ForEach(std::move(pipe.receiver),
70 [&num_received](int i) {
71 num_received++;
72 EXPECT_EQ(num_received, i);
73 return absl::OkStatus();
74 })),
75 JustElem<1>());
76 },
77 NoWakeupScheduler(),
78 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
79 MakeScopedArena(1024, &memory_allocator_));
80 Mock::VerifyAndClearExpectations(&on_done);
81 EXPECT_EQ(num_received, 3);
82 }
83
TEST_F(ForEachTest,SendThriceWithInterActivityPipe)84 TEST_F(ForEachTest, SendThriceWithInterActivityPipe) {
85 int num_received = 0;
86 StrictMock<MockFunction<void(absl::Status)>> on_done_sender;
87 StrictMock<MockFunction<void(absl::Status)>> on_done_receiver;
88 EXPECT_CALL(on_done_sender, Call(absl::OkStatus()));
89 EXPECT_CALL(on_done_receiver, Call(absl::OkStatus()));
90 InterActivityPipe<int, 1> pipe;
91 auto send_activity = MakeActivity(
92 Seq(
93 // Push 3 things into a pipe -- 1, 2, then 3 -- then close.
94 pipe.sender.Push(1), [&pipe] { return pipe.sender.Push(2); },
95 [&pipe] { return pipe.sender.Push(3); },
96 [&pipe] {
97 auto x = std::move(pipe.sender);
98 return absl::OkStatus();
99 }),
100 InlineWakeupScheduler{}, [&on_done_sender](absl::Status status) {
101 on_done_sender.Call(std::move(status));
102 });
103 MakeActivity(
104 [&num_received, &pipe] {
105 // Use a ForEach loop to read them out and verify
106 // all values are seen.
107 return ForEach(std::move(pipe.receiver), [&num_received](int i) {
108 num_received++;
109 EXPECT_EQ(num_received, i);
110 return absl::OkStatus();
111 });
112 },
113 NoWakeupScheduler(),
114 [&on_done_receiver](absl::Status status) {
115 on_done_receiver.Call(std::move(status));
116 });
117 Mock::VerifyAndClearExpectations(&on_done_sender);
118 Mock::VerifyAndClearExpectations(&on_done_receiver);
119 EXPECT_EQ(num_received, 3);
120 }
121
122 // Pollable type that stays movable until it's polled, then causes the test to
123 // fail if it's moved again.
124 // Promises have the property that they can be moved until polled, and this
125 // helps us check that the internals of ForEach respect this rule.
126 class MoveableUntilPolled {
127 public:
128 MoveableUntilPolled() = default;
129 MoveableUntilPolled(const MoveableUntilPolled&) = delete;
130 MoveableUntilPolled& operator=(const MoveableUntilPolled&) = delete;
MoveableUntilPolled(MoveableUntilPolled && other)131 MoveableUntilPolled(MoveableUntilPolled&& other) noexcept : polls_(0) {
132 EXPECT_EQ(other.polls_, 0);
133 }
operator =(MoveableUntilPolled && other)134 MoveableUntilPolled& operator=(MoveableUntilPolled&& other) noexcept {
135 EXPECT_EQ(other.polls_, 0);
136 polls_ = 0;
137 return *this;
138 }
139
operator ()()140 Poll<absl::Status> operator()() {
141 GetContext<Activity>()->ForceImmediateRepoll();
142 ++polls_;
143 if (polls_ == 10) return absl::OkStatus();
144 return Pending();
145 }
146
147 private:
148 int polls_ = 0;
149 };
150
TEST_F(ForEachTest,NoMoveAfterPoll)151 TEST_F(ForEachTest, NoMoveAfterPoll) {
152 int num_received = 0;
153 StrictMock<MockFunction<void(absl::Status)>> on_done;
154 EXPECT_CALL(on_done, Call(absl::OkStatus()));
155 MakeActivity(
156 [&num_received] {
157 Pipe<int> pipe;
158 auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
159 std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
160 return Map(
161 Join(
162 // Push one things into a pipe, then close.
163 Seq((*sender)->Push(1),
164 [sender] {
165 sender->reset();
166 return absl::OkStatus();
167 }),
168 // Use a ForEach loop to read them out and verify all
169 // values are seen.
170 // Inject a MoveableUntilPolled into the loop to ensure that
171 // ForEach doesn't internally move a promise post-polling.
172 ForEach(std::move(pipe.receiver),
173 [&num_received](int i) {
174 num_received++;
175 EXPECT_EQ(num_received, i);
176 return MoveableUntilPolled();
177 })),
178 JustElem<1>());
179 },
180 NoWakeupScheduler(),
181 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
182 MakeScopedArena(1024, &memory_allocator_));
183 Mock::VerifyAndClearExpectations(&on_done);
184 EXPECT_EQ(num_received, 1);
185 }
186
TEST_F(ForEachTest,NextResultHeldThroughCallback)187 TEST_F(ForEachTest, NextResultHeldThroughCallback) {
188 int num_received = 0;
189 StrictMock<MockFunction<void(absl::Status)>> on_done;
190 EXPECT_CALL(on_done, Call(absl::OkStatus()));
191 MakeActivity(
192 [&num_received] {
193 Pipe<int> pipe;
194 auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
195 std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
196 return Map(
197 Join(
198 // Push one things into a pipe, then close.
199 Seq((*sender)->Push(1),
200 [sender] {
201 sender->reset();
202 return absl::OkStatus();
203 }),
204 // Use a ForEach loop to read them out and verify all
205 // values are seen.
206 ForEach(std::move(pipe.receiver),
207 [&num_received, sender](int i) {
208 // While we're processing a value NextResult
209 // should be held disallowing new items to be
210 // pushed.
211 // We also should not have reached the
212 // sender->reset() line above yet either, as
213 // the Push() should block until this code
214 // completes.
215 EXPECT_TRUE((*sender)->Push(2)().pending());
216 num_received++;
217 EXPECT_EQ(num_received, i);
218 return TrySeq(
219 // has the side effect of stalling for some
220 // iterations
221 MoveableUntilPolled(), [sender] {
222 // Perform the same test verifying the same
223 // properties for NextResult holding: all should
224 // still be true.
225 EXPECT_TRUE((*sender)->Push(2)().pending());
226 return absl::OkStatus();
227 });
228 })),
229 JustElem<1>());
230 },
231 NoWakeupScheduler(),
232 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
233 MakeScopedArena(1024, &memory_allocator_));
234 Mock::VerifyAndClearExpectations(&on_done);
235 EXPECT_EQ(num_received, 1);
236 }
237
238 } // namespace grpc_core
239
main(int argc,char ** argv)240 int main(int argc, char** argv) {
241 ::testing::InitGoogleTest(&argc, argv);
242 return RUN_ALL_TESTS();
243 }
244