xref: /aosp_15_r20/external/grpc-grpc/test/core/promise/mpsc_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/mpsc.h"
16 
17 #include <memory>
18 #include <string>
19 
20 #include "absl/types/optional.h"
21 #include "gmock/gmock.h"
22 #include "gtest/gtest.h"
23 
24 #include <grpc/support/log.h>
25 
26 #include "src/core/lib/promise/activity.h"
27 #include "src/core/lib/promise/promise.h"
28 
29 using testing::Mock;
30 using testing::StrictMock;
31 
32 namespace grpc_core {
33 namespace {
34 
35 class MockActivity : public Activity, public Wakeable {
36  public:
37   MOCK_METHOD(void, WakeupRequested, ());
38 
ForceImmediateRepoll(WakeupMask)39   void ForceImmediateRepoll(WakeupMask) override { WakeupRequested(); }
Orphan()40   void Orphan() override {}
MakeOwningWaker()41   Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()42   Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)43   void Wakeup(WakeupMask) override { WakeupRequested(); }
WakeupAsync(WakeupMask)44   void WakeupAsync(WakeupMask) override { WakeupRequested(); }
Drop(WakeupMask)45   void Drop(WakeupMask) override {}
DebugTag() const46   std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const47   std::string ActivityDebugTag(WakeupMask) const override { return DebugTag(); }
48 
Activate()49   void Activate() {
50     if (scoped_activity_ != nullptr) return;
51     scoped_activity_ = std::make_unique<ScopedActivity>(this);
52   }
53 
Deactivate()54   void Deactivate() { scoped_activity_.reset(); }
55 
56  private:
57   std::unique_ptr<ScopedActivity> scoped_activity_;
58 };
59 
60 struct Payload {
61   std::unique_ptr<int> x;
operator ==grpc_core::__anonc0a62ad60111::Payload62   bool operator==(const Payload& other) const {
63     return (x == nullptr && other.x == nullptr) ||
64            (x != nullptr && other.x != nullptr && *x == *other.x);
65   }
66 };
MakePayload(int value)67 Payload MakePayload(int value) { return {std::make_unique<int>(value)}; }
68 
TEST(MpscTest,NoOp)69 TEST(MpscTest, NoOp) { MpscReceiver<Payload> receiver(1); }
70 
TEST(MpscTest,MakeSender)71 TEST(MpscTest, MakeSender) {
72   MpscReceiver<Payload> receiver(1);
73   MpscSender<Payload> sender = receiver.MakeSender();
74 }
75 
TEST(MpscTest,SendOneThingInstantly)76 TEST(MpscTest, SendOneThingInstantly) {
77   MpscReceiver<Payload> receiver(1);
78   MpscSender<Payload> sender = receiver.MakeSender();
79   EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true);
80 }
81 
TEST(MpscTest,SendOneThingInstantlyAndReceiveInstantly)82 TEST(MpscTest, SendOneThingInstantlyAndReceiveInstantly) {
83   MpscReceiver<Payload> receiver(1);
84   MpscSender<Payload> sender = receiver.MakeSender();
85   EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true);
86   EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(1));
87 }
88 
TEST(MpscTest,SendingLotsOfThingsGivesPushback)89 TEST(MpscTest, SendingLotsOfThingsGivesPushback) {
90   StrictMock<MockActivity> activity1;
91   MpscReceiver<Payload> receiver(1);
92   MpscSender<Payload> sender = receiver.MakeSender();
93 
94   activity1.Activate();
95   EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true);
96   EXPECT_EQ(NowOrNever(sender.Send(MakePayload(2))), absl::nullopt);
97   activity1.Deactivate();
98 
99   EXPECT_CALL(activity1, WakeupRequested());
100 }
101 
TEST(MpscTest,ReceivingAfterBlockageWakesUp)102 TEST(MpscTest, ReceivingAfterBlockageWakesUp) {
103   StrictMock<MockActivity> activity1;
104   StrictMock<MockActivity> activity2;
105   MpscReceiver<Payload> receiver(1);
106   MpscSender<Payload> sender = receiver.MakeSender();
107 
108   activity1.Activate();
109   EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true);
110   auto send2 = sender.Send(MakePayload(2));
111   EXPECT_EQ(send2(), Poll<bool>(Pending{}));
112   activity1.Deactivate();
113 
114   activity2.Activate();
115   EXPECT_CALL(activity1, WakeupRequested());
116   EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(1));
117   Mock::VerifyAndClearExpectations(&activity1);
118   auto receive2 = receiver.Next();
119   EXPECT_EQ(receive2(), Poll<Payload>(Pending{}));
120   activity2.Deactivate();
121 
122   activity1.Activate();
123   EXPECT_CALL(activity2, WakeupRequested());
124   EXPECT_EQ(send2(), Poll<bool>(true));
125   Mock::VerifyAndClearExpectations(&activity2);
126   activity1.Deactivate();
127 
128   activity2.Activate();
129   EXPECT_EQ(receive2(), Poll<Payload>(MakePayload(2)));
130   activity2.Deactivate();
131 }
132 
TEST(MpscTest,BigBufferAllowsBurst)133 TEST(MpscTest, BigBufferAllowsBurst) {
134   MpscReceiver<Payload> receiver(50);
135   MpscSender<Payload> sender = receiver.MakeSender();
136 
137   for (int i = 0; i < 25; i++) {
138     EXPECT_EQ(NowOrNever(sender.Send(MakePayload(i))), true);
139   }
140   for (int i = 0; i < 25; i++) {
141     EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(i));
142   }
143 }
144 
TEST(MpscTest,ClosureIsVisibleToSenders)145 TEST(MpscTest, ClosureIsVisibleToSenders) {
146   auto receiver = std::make_unique<MpscReceiver<Payload>>(1);
147   MpscSender<Payload> sender = receiver->MakeSender();
148   receiver.reset();
149   EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), false);
150 }
151 
TEST(MpscTest,ImmediateSendWorks)152 TEST(MpscTest, ImmediateSendWorks) {
153   StrictMock<MockActivity> activity;
154   MpscReceiver<Payload> receiver(1);
155   MpscSender<Payload> sender = receiver.MakeSender();
156 
157   EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(1)), true);
158   EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(2)), true);
159   EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(3)), true);
160   EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(4)), true);
161   EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(5)), true);
162   EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(6)), true);
163   EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(7)), true);
164 
165   activity.Activate();
166   EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(1));
167   EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(2));
168   EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(3));
169   EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(4));
170   EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(5));
171   EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(6));
172   EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(7));
173   auto receive2 = receiver.Next();
174   EXPECT_EQ(receive2(), Poll<Payload>(Pending{}));
175   activity.Deactivate();
176 }
177 
178 }  // namespace
179 }  // namespace grpc_core
180 
main(int argc,char ** argv)181 int main(int argc, char** argv) {
182   gpr_log_verbosity_init();
183   ::testing::InitGoogleTest(&argc, argv);
184   return RUN_ALL_TESTS();
185 }
186