xref: /aosp_15_r20/external/pigweed/pw_rpc/nanopb/callback_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker // Copyright 2023 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker 
15*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/nanopb/client_testing.h"
16*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc_test_protos/test.rpc.pb.h"
17*61c4878aSAndroid Build Coastguard Worker #include "pw_sync/binary_semaphore.h"
18*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/non_portable_test_thread_options.h"
19*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/sleep.h"
20*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/thread.h"
21*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/yield.h"
22*61c4878aSAndroid Build Coastguard Worker #include "pw_unit_test/framework.h"
23*61c4878aSAndroid Build Coastguard Worker 
24*61c4878aSAndroid Build Coastguard Worker namespace pw::rpc {
25*61c4878aSAndroid Build Coastguard Worker namespace {
26*61c4878aSAndroid Build Coastguard Worker 
27*61c4878aSAndroid Build Coastguard Worker using namespace std::chrono_literals;
28*61c4878aSAndroid Build Coastguard Worker 
29*61c4878aSAndroid Build Coastguard Worker using test::pw_rpc::nanopb::TestService;
30*61c4878aSAndroid Build Coastguard Worker 
31*61c4878aSAndroid Build Coastguard Worker using ClientReaderWriter =
32*61c4878aSAndroid Build Coastguard Worker     NanopbClientReaderWriter<pw_rpc_test_TestRequest,
33*61c4878aSAndroid Build Coastguard Worker                              pw_rpc_test_TestStreamResponse>;
34*61c4878aSAndroid Build Coastguard Worker 
35*61c4878aSAndroid Build Coastguard Worker // These tests cover interactions between a thread moving or destroying an RPC
36*61c4878aSAndroid Build Coastguard Worker // call object and a thread running callbacks for that call. In order to test
37*61c4878aSAndroid Build Coastguard Worker // that the first thread waits for callbacks to complete when trying to move or
38*61c4878aSAndroid Build Coastguard Worker // destroy the call, it is necessary to have the callback thread yield to the
39*61c4878aSAndroid Build Coastguard Worker // other thread. There isn't a good way to synchronize these threads without
40*61c4878aSAndroid Build Coastguard Worker // changing the code under test.
YieldToOtherThread()41*61c4878aSAndroid Build Coastguard Worker void YieldToOtherThread() {
42*61c4878aSAndroid Build Coastguard Worker   // Sleep for a while and then yield just to be sure the other thread runs.
43*61c4878aSAndroid Build Coastguard Worker   this_thread::sleep_for(100ms);
44*61c4878aSAndroid Build Coastguard Worker   this_thread::yield();
45*61c4878aSAndroid Build Coastguard Worker }
46*61c4878aSAndroid Build Coastguard Worker 
47*61c4878aSAndroid Build Coastguard Worker class CallbacksTest : public ::testing::Test {
48*61c4878aSAndroid Build Coastguard Worker  protected:
CallbacksTest()49*61c4878aSAndroid Build Coastguard Worker   CallbacksTest()
50*61c4878aSAndroid Build Coastguard Worker       // TODO: b/290860904 - Replace TestOptionsThread0 with
51*61c4878aSAndroid Build Coastguard Worker       // TestThreadContext.
52*61c4878aSAndroid Build Coastguard Worker       : callback_thread_(thread::test::TestOptionsThread0(),
53*61c4878aSAndroid Build Coastguard Worker                          [this] { SendResponseAfterSemaphore(); }) {}
54*61c4878aSAndroid Build Coastguard Worker 
~CallbacksTest()55*61c4878aSAndroid Build Coastguard Worker   ~CallbacksTest() override {
56*61c4878aSAndroid Build Coastguard Worker     EXPECT_FALSE(callback_thread_.joinable());  // Tests must join the thread!
57*61c4878aSAndroid Build Coastguard Worker   }
58*61c4878aSAndroid Build Coastguard Worker 
RespondToCall(const ClientReaderWriter & call)59*61c4878aSAndroid Build Coastguard Worker   void RespondToCall(const ClientReaderWriter& call) {
60*61c4878aSAndroid Build Coastguard Worker     respond_to_call_ = &call;
61*61c4878aSAndroid Build Coastguard Worker   }
62*61c4878aSAndroid Build Coastguard Worker 
63*61c4878aSAndroid Build Coastguard Worker   NanopbClientTestContext<> context_;
64*61c4878aSAndroid Build Coastguard Worker   sync::BinarySemaphore callback_thread_sem_;
65*61c4878aSAndroid Build Coastguard Worker   sync::BinarySemaphore main_thread_sem_;
66*61c4878aSAndroid Build Coastguard Worker 
67*61c4878aSAndroid Build Coastguard Worker   Thread callback_thread_;
68*61c4878aSAndroid Build Coastguard Worker 
69*61c4878aSAndroid Build Coastguard Worker   // Must be incremented exactly once by the RPC callback in each test.
70*61c4878aSAndroid Build Coastguard Worker   volatile int callback_executed_ = 0;
71*61c4878aSAndroid Build Coastguard Worker 
72*61c4878aSAndroid Build Coastguard Worker   // Variables optionally used by tests. These are in this object so lambads
73*61c4878aSAndroid Build Coastguard Worker   // only need to capture [this] to access them.
74*61c4878aSAndroid Build Coastguard Worker   volatile bool call_is_in_scope_ = false;
75*61c4878aSAndroid Build Coastguard Worker 
76*61c4878aSAndroid Build Coastguard Worker   ClientReaderWriter call_1_;
77*61c4878aSAndroid Build Coastguard Worker   ClientReaderWriter call_2_;
78*61c4878aSAndroid Build Coastguard Worker 
79*61c4878aSAndroid Build Coastguard Worker  private:
SendResponseAfterSemaphore()80*61c4878aSAndroid Build Coastguard Worker   void SendResponseAfterSemaphore() {
81*61c4878aSAndroid Build Coastguard Worker     // Wait until the main thread says to send the response.
82*61c4878aSAndroid Build Coastguard Worker     callback_thread_sem_.acquire();
83*61c4878aSAndroid Build Coastguard Worker 
84*61c4878aSAndroid Build Coastguard Worker     context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
85*61c4878aSAndroid Build Coastguard Worker         {}, respond_to_call_->id());
86*61c4878aSAndroid Build Coastguard Worker   }
87*61c4878aSAndroid Build Coastguard Worker 
88*61c4878aSAndroid Build Coastguard Worker   const ClientReaderWriter* respond_to_call_ = &call_1_;
89*61c4878aSAndroid Build Coastguard Worker };
90*61c4878aSAndroid Build Coastguard Worker 
TEST_F(CallbacksTest,DestructorWaitsUntilCallbacksComplete)91*61c4878aSAndroid Build Coastguard Worker TEST_F(CallbacksTest, DestructorWaitsUntilCallbacksComplete) {
92*61c4878aSAndroid Build Coastguard Worker   // Skip this test if locks are disabled because the thread can't yield.
93*61c4878aSAndroid Build Coastguard Worker   if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
94*61c4878aSAndroid Build Coastguard Worker     callback_thread_sem_.release();
95*61c4878aSAndroid Build Coastguard Worker     callback_thread_.join();
96*61c4878aSAndroid Build Coastguard Worker     GTEST_SKIP();
97*61c4878aSAndroid Build Coastguard Worker   }
98*61c4878aSAndroid Build Coastguard Worker 
99*61c4878aSAndroid Build Coastguard Worker   {
100*61c4878aSAndroid Build Coastguard Worker     ClientReaderWriter local_call = TestService::TestBidirectionalStreamRpc(
101*61c4878aSAndroid Build Coastguard Worker         context_.client(), context_.channel().id());
102*61c4878aSAndroid Build Coastguard Worker     RespondToCall(local_call);
103*61c4878aSAndroid Build Coastguard Worker 
104*61c4878aSAndroid Build Coastguard Worker     call_is_in_scope_ = true;
105*61c4878aSAndroid Build Coastguard Worker 
106*61c4878aSAndroid Build Coastguard Worker     local_call.set_on_next([this](const pw_rpc_test_TestStreamResponse&) {
107*61c4878aSAndroid Build Coastguard Worker       main_thread_sem_.release();
108*61c4878aSAndroid Build Coastguard Worker 
109*61c4878aSAndroid Build Coastguard Worker       // Wait for a while so the main thread tries to destroy the call.
110*61c4878aSAndroid Build Coastguard Worker       YieldToOtherThread();
111*61c4878aSAndroid Build Coastguard Worker 
112*61c4878aSAndroid Build Coastguard Worker       // Now, make sure the call is still in scope. The main thread should
113*61c4878aSAndroid Build Coastguard Worker       // block in the call's destructor until this callback completes.
114*61c4878aSAndroid Build Coastguard Worker       EXPECT_TRUE(call_is_in_scope_);
115*61c4878aSAndroid Build Coastguard Worker 
116*61c4878aSAndroid Build Coastguard Worker       callback_executed_ = callback_executed_ + 1;
117*61c4878aSAndroid Build Coastguard Worker     });
118*61c4878aSAndroid Build Coastguard Worker 
119*61c4878aSAndroid Build Coastguard Worker     // Start the callback thread so it can invoke the callback.
120*61c4878aSAndroid Build Coastguard Worker     callback_thread_sem_.release();
121*61c4878aSAndroid Build Coastguard Worker 
122*61c4878aSAndroid Build Coastguard Worker     // Wait until the callback thread starts.
123*61c4878aSAndroid Build Coastguard Worker     main_thread_sem_.acquire();
124*61c4878aSAndroid Build Coastguard Worker   }
125*61c4878aSAndroid Build Coastguard Worker 
126*61c4878aSAndroid Build Coastguard Worker   // The callback thread will sleep for a bit. Meanwhile, let the call go out
127*61c4878aSAndroid Build Coastguard Worker   // of scope, and mark it as such.
128*61c4878aSAndroid Build Coastguard Worker   call_is_in_scope_ = false;
129*61c4878aSAndroid Build Coastguard Worker 
130*61c4878aSAndroid Build Coastguard Worker   // Wait for the callback thread to finish.
131*61c4878aSAndroid Build Coastguard Worker   callback_thread_.join();
132*61c4878aSAndroid Build Coastguard Worker 
133*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(callback_executed_, 1);
134*61c4878aSAndroid Build Coastguard Worker }
135*61c4878aSAndroid Build Coastguard Worker 
TEST_F(CallbacksTest,MoveActiveCall_WaitsForCallbackToComplete)136*61c4878aSAndroid Build Coastguard Worker TEST_F(CallbacksTest, MoveActiveCall_WaitsForCallbackToComplete) {
137*61c4878aSAndroid Build Coastguard Worker   // Skip this test if locks are disabled because the thread can't yield.
138*61c4878aSAndroid Build Coastguard Worker   if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
139*61c4878aSAndroid Build Coastguard Worker     callback_thread_sem_.release();
140*61c4878aSAndroid Build Coastguard Worker     callback_thread_.join();
141*61c4878aSAndroid Build Coastguard Worker     GTEST_SKIP();
142*61c4878aSAndroid Build Coastguard Worker   }
143*61c4878aSAndroid Build Coastguard Worker 
144*61c4878aSAndroid Build Coastguard Worker   call_1_ = TestService::TestBidirectionalStreamRpc(
145*61c4878aSAndroid Build Coastguard Worker       context_.client(),
146*61c4878aSAndroid Build Coastguard Worker       context_.channel().id(),
147*61c4878aSAndroid Build Coastguard Worker       [this](const pw_rpc_test_TestStreamResponse&) {
148*61c4878aSAndroid Build Coastguard Worker         main_thread_sem_.release();  // Confirm that this thread started
149*61c4878aSAndroid Build Coastguard Worker 
150*61c4878aSAndroid Build Coastguard Worker         YieldToOtherThread();
151*61c4878aSAndroid Build Coastguard Worker 
152*61c4878aSAndroid Build Coastguard Worker         callback_executed_ = callback_executed_ + 1;
153*61c4878aSAndroid Build Coastguard Worker       });
154*61c4878aSAndroid Build Coastguard Worker 
155*61c4878aSAndroid Build Coastguard Worker   // Start the callback thread so it can invoke the callback.
156*61c4878aSAndroid Build Coastguard Worker   callback_thread_sem_.release();
157*61c4878aSAndroid Build Coastguard Worker 
158*61c4878aSAndroid Build Coastguard Worker   // Confirm that the callback thread started.
159*61c4878aSAndroid Build Coastguard Worker   main_thread_sem_.acquire();
160*61c4878aSAndroid Build Coastguard Worker 
161*61c4878aSAndroid Build Coastguard Worker   // Move the call object. This thread should wait until the on_completed
162*61c4878aSAndroid Build Coastguard Worker   // callback is done.
163*61c4878aSAndroid Build Coastguard Worker   EXPECT_TRUE(call_1_.active());
164*61c4878aSAndroid Build Coastguard Worker   call_2_ = std::move(call_1_);
165*61c4878aSAndroid Build Coastguard Worker 
166*61c4878aSAndroid Build Coastguard Worker   // The callback should already have finished. This thread should have waited
167*61c4878aSAndroid Build Coastguard Worker   // for it to finish during the move.
168*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(callback_executed_, 1);
169*61c4878aSAndroid Build Coastguard Worker   EXPECT_FALSE(call_1_.active());
170*61c4878aSAndroid Build Coastguard Worker   EXPECT_TRUE(call_2_.active());
171*61c4878aSAndroid Build Coastguard Worker 
172*61c4878aSAndroid Build Coastguard Worker   callback_thread_.join();
173*61c4878aSAndroid Build Coastguard Worker }
174*61c4878aSAndroid Build Coastguard Worker 
TEST_F(CallbacksTest,MoveOtherCallIntoOwnCallInCallback)175*61c4878aSAndroid Build Coastguard Worker TEST_F(CallbacksTest, MoveOtherCallIntoOwnCallInCallback) {
176*61c4878aSAndroid Build Coastguard Worker   call_1_ = TestService::TestBidirectionalStreamRpc(
177*61c4878aSAndroid Build Coastguard Worker       context_.client(),
178*61c4878aSAndroid Build Coastguard Worker       context_.channel().id(),
179*61c4878aSAndroid Build Coastguard Worker       [this](const pw_rpc_test_TestStreamResponse&) {
180*61c4878aSAndroid Build Coastguard Worker         main_thread_sem_.release();  // Confirm that this thread started
181*61c4878aSAndroid Build Coastguard Worker 
182*61c4878aSAndroid Build Coastguard Worker         call_1_ = std::move(call_2_);
183*61c4878aSAndroid Build Coastguard Worker 
184*61c4878aSAndroid Build Coastguard Worker         callback_executed_ = callback_executed_ + 1;
185*61c4878aSAndroid Build Coastguard Worker       });
186*61c4878aSAndroid Build Coastguard Worker 
187*61c4878aSAndroid Build Coastguard Worker   call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
188*61c4878aSAndroid Build Coastguard Worker                                                     context_.channel().id());
189*61c4878aSAndroid Build Coastguard Worker 
190*61c4878aSAndroid Build Coastguard Worker   EXPECT_TRUE(call_1_.active());
191*61c4878aSAndroid Build Coastguard Worker   EXPECT_TRUE(call_2_.active());
192*61c4878aSAndroid Build Coastguard Worker 
193*61c4878aSAndroid Build Coastguard Worker   // Start the callback thread and wait for it to finish.
194*61c4878aSAndroid Build Coastguard Worker   callback_thread_sem_.release();
195*61c4878aSAndroid Build Coastguard Worker   callback_thread_.join();
196*61c4878aSAndroid Build Coastguard Worker 
197*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(callback_executed_, 1);
198*61c4878aSAndroid Build Coastguard Worker   EXPECT_TRUE(call_1_.active());
199*61c4878aSAndroid Build Coastguard Worker   EXPECT_FALSE(call_2_.active());
200*61c4878aSAndroid Build Coastguard Worker }
201*61c4878aSAndroid Build Coastguard Worker 
TEST_F(CallbacksTest,MoveOwnCallInCallback)202*61c4878aSAndroid Build Coastguard Worker TEST_F(CallbacksTest, MoveOwnCallInCallback) {
203*61c4878aSAndroid Build Coastguard Worker   call_1_ = TestService::TestBidirectionalStreamRpc(
204*61c4878aSAndroid Build Coastguard Worker       context_.client(),
205*61c4878aSAndroid Build Coastguard Worker       context_.channel().id(),
206*61c4878aSAndroid Build Coastguard Worker       [this](const pw_rpc_test_TestStreamResponse&) {
207*61c4878aSAndroid Build Coastguard Worker         main_thread_sem_.release();  // Confirm that this thread started
208*61c4878aSAndroid Build Coastguard Worker 
209*61c4878aSAndroid Build Coastguard Worker         // Cancel this call first, or the move will deadlock, since the moving
210*61c4878aSAndroid Build Coastguard Worker         // thread will wait for the callback thread (both this thread) to
211*61c4878aSAndroid Build Coastguard Worker         // terminate if the call is active.
212*61c4878aSAndroid Build Coastguard Worker         EXPECT_EQ(OkStatus(), call_1_.Cancel());
213*61c4878aSAndroid Build Coastguard Worker         call_2_ = std::move(call_1_);
214*61c4878aSAndroid Build Coastguard Worker 
215*61c4878aSAndroid Build Coastguard Worker         callback_executed_ = callback_executed_ + 1;
216*61c4878aSAndroid Build Coastguard Worker       });
217*61c4878aSAndroid Build Coastguard Worker 
218*61c4878aSAndroid Build Coastguard Worker   call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
219*61c4878aSAndroid Build Coastguard Worker                                                     context_.channel().id());
220*61c4878aSAndroid Build Coastguard Worker 
221*61c4878aSAndroid Build Coastguard Worker   EXPECT_TRUE(call_1_.active());
222*61c4878aSAndroid Build Coastguard Worker   EXPECT_TRUE(call_2_.active());
223*61c4878aSAndroid Build Coastguard Worker 
224*61c4878aSAndroid Build Coastguard Worker   // Start the callback thread and wait for it to finish.
225*61c4878aSAndroid Build Coastguard Worker   callback_thread_sem_.release();
226*61c4878aSAndroid Build Coastguard Worker   callback_thread_.join();
227*61c4878aSAndroid Build Coastguard Worker 
228*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(callback_executed_, 1);
229*61c4878aSAndroid Build Coastguard Worker   EXPECT_FALSE(call_1_.active());
230*61c4878aSAndroid Build Coastguard Worker   EXPECT_FALSE(call_2_.active());
231*61c4878aSAndroid Build Coastguard Worker }
232*61c4878aSAndroid Build Coastguard Worker 
TEST_F(CallbacksTest,PacketDroppedIfOnNextIsBusy)233*61c4878aSAndroid Build Coastguard Worker TEST_F(CallbacksTest, PacketDroppedIfOnNextIsBusy) {
234*61c4878aSAndroid Build Coastguard Worker   call_1_ = TestService::TestBidirectionalStreamRpc(
235*61c4878aSAndroid Build Coastguard Worker       context_.client(),
236*61c4878aSAndroid Build Coastguard Worker       context_.channel().id(),
237*61c4878aSAndroid Build Coastguard Worker       [this](const pw_rpc_test_TestStreamResponse&) {
238*61c4878aSAndroid Build Coastguard Worker         main_thread_sem_.release();  // Confirm that this thread started
239*61c4878aSAndroid Build Coastguard Worker 
240*61c4878aSAndroid Build Coastguard Worker         callback_thread_sem_.acquire();  // Wait for the main thread to release
241*61c4878aSAndroid Build Coastguard Worker 
242*61c4878aSAndroid Build Coastguard Worker         callback_executed_ = callback_executed_ + 1;
243*61c4878aSAndroid Build Coastguard Worker       });
244*61c4878aSAndroid Build Coastguard Worker 
245*61c4878aSAndroid Build Coastguard Worker   // Start the callback thread.
246*61c4878aSAndroid Build Coastguard Worker   callback_thread_sem_.release();
247*61c4878aSAndroid Build Coastguard Worker 
248*61c4878aSAndroid Build Coastguard Worker   main_thread_sem_.acquire();  // Confirm that the callback is running
249*61c4878aSAndroid Build Coastguard Worker 
250*61c4878aSAndroid Build Coastguard Worker   // Handle a few packets for this call, which should be dropped since on_next
251*61c4878aSAndroid Build Coastguard Worker   // is busy. callback_executed_ should remain at 1.
252*61c4878aSAndroid Build Coastguard Worker   for (int i = 0; i < 5; ++i) {
253*61c4878aSAndroid Build Coastguard Worker     context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
254*61c4878aSAndroid Build Coastguard Worker         {}, call_1_.id());
255*61c4878aSAndroid Build Coastguard Worker   }
256*61c4878aSAndroid Build Coastguard Worker 
257*61c4878aSAndroid Build Coastguard Worker   // Wait for the callback thread to finish.
258*61c4878aSAndroid Build Coastguard Worker   callback_thread_sem_.release();
259*61c4878aSAndroid Build Coastguard Worker   callback_thread_.join();
260*61c4878aSAndroid Build Coastguard Worker 
261*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(callback_executed_, 1);
262*61c4878aSAndroid Build Coastguard Worker }
263*61c4878aSAndroid Build Coastguard Worker 
264*61c4878aSAndroid Build Coastguard Worker }  // namespace
265*61c4878aSAndroid Build Coastguard Worker }  // namespace pw::rpc
266