1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/mpsc.h"
16
17 #include <memory>
18 #include <string>
19
20 #include "absl/types/optional.h"
21 #include "gmock/gmock.h"
22 #include "gtest/gtest.h"
23
24 #include <grpc/support/log.h>
25
26 #include "src/core/lib/promise/activity.h"
27 #include "src/core/lib/promise/promise.h"
28
29 using testing::Mock;
30 using testing::StrictMock;
31
32 namespace grpc_core {
33 namespace {
34
35 class MockActivity : public Activity, public Wakeable {
36 public:
37 MOCK_METHOD(void, WakeupRequested, ());
38
ForceImmediateRepoll(WakeupMask)39 void ForceImmediateRepoll(WakeupMask) override { WakeupRequested(); }
Orphan()40 void Orphan() override {}
MakeOwningWaker()41 Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()42 Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)43 void Wakeup(WakeupMask) override { WakeupRequested(); }
WakeupAsync(WakeupMask)44 void WakeupAsync(WakeupMask) override { WakeupRequested(); }
Drop(WakeupMask)45 void Drop(WakeupMask) override {}
DebugTag() const46 std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const47 std::string ActivityDebugTag(WakeupMask) const override { return DebugTag(); }
48
Activate()49 void Activate() {
50 if (scoped_activity_ != nullptr) return;
51 scoped_activity_ = std::make_unique<ScopedActivity>(this);
52 }
53
Deactivate()54 void Deactivate() { scoped_activity_.reset(); }
55
56 private:
57 std::unique_ptr<ScopedActivity> scoped_activity_;
58 };
59
60 struct Payload {
61 std::unique_ptr<int> x;
operator ==grpc_core::__anonc0a62ad60111::Payload62 bool operator==(const Payload& other) const {
63 return (x == nullptr && other.x == nullptr) ||
64 (x != nullptr && other.x != nullptr && *x == *other.x);
65 }
66 };
MakePayload(int value)67 Payload MakePayload(int value) { return {std::make_unique<int>(value)}; }
68
TEST(MpscTest,NoOp)69 TEST(MpscTest, NoOp) { MpscReceiver<Payload> receiver(1); }
70
TEST(MpscTest,MakeSender)71 TEST(MpscTest, MakeSender) {
72 MpscReceiver<Payload> receiver(1);
73 MpscSender<Payload> sender = receiver.MakeSender();
74 }
75
TEST(MpscTest,SendOneThingInstantly)76 TEST(MpscTest, SendOneThingInstantly) {
77 MpscReceiver<Payload> receiver(1);
78 MpscSender<Payload> sender = receiver.MakeSender();
79 EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true);
80 }
81
TEST(MpscTest,SendOneThingInstantlyAndReceiveInstantly)82 TEST(MpscTest, SendOneThingInstantlyAndReceiveInstantly) {
83 MpscReceiver<Payload> receiver(1);
84 MpscSender<Payload> sender = receiver.MakeSender();
85 EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true);
86 EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(1));
87 }
88
TEST(MpscTest,SendingLotsOfThingsGivesPushback)89 TEST(MpscTest, SendingLotsOfThingsGivesPushback) {
90 StrictMock<MockActivity> activity1;
91 MpscReceiver<Payload> receiver(1);
92 MpscSender<Payload> sender = receiver.MakeSender();
93
94 activity1.Activate();
95 EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true);
96 EXPECT_EQ(NowOrNever(sender.Send(MakePayload(2))), absl::nullopt);
97 activity1.Deactivate();
98
99 EXPECT_CALL(activity1, WakeupRequested());
100 }
101
TEST(MpscTest,ReceivingAfterBlockageWakesUp)102 TEST(MpscTest, ReceivingAfterBlockageWakesUp) {
103 StrictMock<MockActivity> activity1;
104 StrictMock<MockActivity> activity2;
105 MpscReceiver<Payload> receiver(1);
106 MpscSender<Payload> sender = receiver.MakeSender();
107
108 activity1.Activate();
109 EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true);
110 auto send2 = sender.Send(MakePayload(2));
111 EXPECT_EQ(send2(), Poll<bool>(Pending{}));
112 activity1.Deactivate();
113
114 activity2.Activate();
115 EXPECT_CALL(activity1, WakeupRequested());
116 EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(1));
117 Mock::VerifyAndClearExpectations(&activity1);
118 auto receive2 = receiver.Next();
119 EXPECT_EQ(receive2(), Poll<Payload>(Pending{}));
120 activity2.Deactivate();
121
122 activity1.Activate();
123 EXPECT_CALL(activity2, WakeupRequested());
124 EXPECT_EQ(send2(), Poll<bool>(true));
125 Mock::VerifyAndClearExpectations(&activity2);
126 activity1.Deactivate();
127
128 activity2.Activate();
129 EXPECT_EQ(receive2(), Poll<Payload>(MakePayload(2)));
130 activity2.Deactivate();
131 }
132
TEST(MpscTest,BigBufferAllowsBurst)133 TEST(MpscTest, BigBufferAllowsBurst) {
134 MpscReceiver<Payload> receiver(50);
135 MpscSender<Payload> sender = receiver.MakeSender();
136
137 for (int i = 0; i < 25; i++) {
138 EXPECT_EQ(NowOrNever(sender.Send(MakePayload(i))), true);
139 }
140 for (int i = 0; i < 25; i++) {
141 EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(i));
142 }
143 }
144
TEST(MpscTest,ClosureIsVisibleToSenders)145 TEST(MpscTest, ClosureIsVisibleToSenders) {
146 auto receiver = std::make_unique<MpscReceiver<Payload>>(1);
147 MpscSender<Payload> sender = receiver->MakeSender();
148 receiver.reset();
149 EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), false);
150 }
151
TEST(MpscTest,ImmediateSendWorks)152 TEST(MpscTest, ImmediateSendWorks) {
153 StrictMock<MockActivity> activity;
154 MpscReceiver<Payload> receiver(1);
155 MpscSender<Payload> sender = receiver.MakeSender();
156
157 EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(1)), true);
158 EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(2)), true);
159 EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(3)), true);
160 EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(4)), true);
161 EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(5)), true);
162 EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(6)), true);
163 EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(7)), true);
164
165 activity.Activate();
166 EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(1));
167 EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(2));
168 EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(3));
169 EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(4));
170 EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(5));
171 EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(6));
172 EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(7));
173 auto receive2 = receiver.Next();
174 EXPECT_EQ(receive2(), Poll<Payload>(Pending{}));
175 activity.Deactivate();
176 }
177
178 } // namespace
179 } // namespace grpc_core
180
main(int argc,char ** argv)181 int main(int argc, char** argv) {
182 gpr_log_verbosity_init();
183 ::testing::InitGoogleTest(&argc, argv);
184 return RUN_ALL_TESTS();
185 }
186