xref: /aosp_15_r20/external/pigweed/pw_rpc/nanopb/callback_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/nanopb/client_testing.h"
16 #include "pw_rpc_test_protos/test.rpc.pb.h"
17 #include "pw_sync/binary_semaphore.h"
18 #include "pw_thread/non_portable_test_thread_options.h"
19 #include "pw_thread/sleep.h"
20 #include "pw_thread/thread.h"
21 #include "pw_thread/yield.h"
22 #include "pw_unit_test/framework.h"
23 
24 namespace pw::rpc {
25 namespace {
26 
27 using namespace std::chrono_literals;
28 
29 using test::pw_rpc::nanopb::TestService;
30 
31 using ClientReaderWriter =
32     NanopbClientReaderWriter<pw_rpc_test_TestRequest,
33                              pw_rpc_test_TestStreamResponse>;
34 
35 // These tests cover interactions between a thread moving or destroying an RPC
36 // call object and a thread running callbacks for that call. In order to test
37 // that the first thread waits for callbacks to complete when trying to move or
38 // destroy the call, it is necessary to have the callback thread yield to the
39 // other thread. There isn't a good way to synchronize these threads without
40 // changing the code under test.
YieldToOtherThread()41 void YieldToOtherThread() {
42   // Sleep for a while and then yield just to be sure the other thread runs.
43   this_thread::sleep_for(100ms);
44   this_thread::yield();
45 }
46 
47 class CallbacksTest : public ::testing::Test {
48  protected:
CallbacksTest()49   CallbacksTest()
50       // TODO: b/290860904 - Replace TestOptionsThread0 with
51       // TestThreadContext.
52       : callback_thread_(thread::test::TestOptionsThread0(),
53                          [this] { SendResponseAfterSemaphore(); }) {}
54 
~CallbacksTest()55   ~CallbacksTest() override {
56     EXPECT_FALSE(callback_thread_.joinable());  // Tests must join the thread!
57   }
58 
RespondToCall(const ClientReaderWriter & call)59   void RespondToCall(const ClientReaderWriter& call) {
60     respond_to_call_ = &call;
61   }
62 
63   NanopbClientTestContext<> context_;
64   sync::BinarySemaphore callback_thread_sem_;
65   sync::BinarySemaphore main_thread_sem_;
66 
67   Thread callback_thread_;
68 
69   // Must be incremented exactly once by the RPC callback in each test.
70   volatile int callback_executed_ = 0;
71 
72   // Variables optionally used by tests. These are in this object so lambads
73   // only need to capture [this] to access them.
74   volatile bool call_is_in_scope_ = false;
75 
76   ClientReaderWriter call_1_;
77   ClientReaderWriter call_2_;
78 
79  private:
SendResponseAfterSemaphore()80   void SendResponseAfterSemaphore() {
81     // Wait until the main thread says to send the response.
82     callback_thread_sem_.acquire();
83 
84     context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
85         {}, respond_to_call_->id());
86   }
87 
88   const ClientReaderWriter* respond_to_call_ = &call_1_;
89 };
90 
TEST_F(CallbacksTest,DestructorWaitsUntilCallbacksComplete)91 TEST_F(CallbacksTest, DestructorWaitsUntilCallbacksComplete) {
92   // Skip this test if locks are disabled because the thread can't yield.
93   if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
94     callback_thread_sem_.release();
95     callback_thread_.join();
96     GTEST_SKIP();
97   }
98 
99   {
100     ClientReaderWriter local_call = TestService::TestBidirectionalStreamRpc(
101         context_.client(), context_.channel().id());
102     RespondToCall(local_call);
103 
104     call_is_in_scope_ = true;
105 
106     local_call.set_on_next([this](const pw_rpc_test_TestStreamResponse&) {
107       main_thread_sem_.release();
108 
109       // Wait for a while so the main thread tries to destroy the call.
110       YieldToOtherThread();
111 
112       // Now, make sure the call is still in scope. The main thread should
113       // block in the call's destructor until this callback completes.
114       EXPECT_TRUE(call_is_in_scope_);
115 
116       callback_executed_ = callback_executed_ + 1;
117     });
118 
119     // Start the callback thread so it can invoke the callback.
120     callback_thread_sem_.release();
121 
122     // Wait until the callback thread starts.
123     main_thread_sem_.acquire();
124   }
125 
126   // The callback thread will sleep for a bit. Meanwhile, let the call go out
127   // of scope, and mark it as such.
128   call_is_in_scope_ = false;
129 
130   // Wait for the callback thread to finish.
131   callback_thread_.join();
132 
133   EXPECT_EQ(callback_executed_, 1);
134 }
135 
TEST_F(CallbacksTest,MoveActiveCall_WaitsForCallbackToComplete)136 TEST_F(CallbacksTest, MoveActiveCall_WaitsForCallbackToComplete) {
137   // Skip this test if locks are disabled because the thread can't yield.
138   if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
139     callback_thread_sem_.release();
140     callback_thread_.join();
141     GTEST_SKIP();
142   }
143 
144   call_1_ = TestService::TestBidirectionalStreamRpc(
145       context_.client(),
146       context_.channel().id(),
147       [this](const pw_rpc_test_TestStreamResponse&) {
148         main_thread_sem_.release();  // Confirm that this thread started
149 
150         YieldToOtherThread();
151 
152         callback_executed_ = callback_executed_ + 1;
153       });
154 
155   // Start the callback thread so it can invoke the callback.
156   callback_thread_sem_.release();
157 
158   // Confirm that the callback thread started.
159   main_thread_sem_.acquire();
160 
161   // Move the call object. This thread should wait until the on_completed
162   // callback is done.
163   EXPECT_TRUE(call_1_.active());
164   call_2_ = std::move(call_1_);
165 
166   // The callback should already have finished. This thread should have waited
167   // for it to finish during the move.
168   EXPECT_EQ(callback_executed_, 1);
169   EXPECT_FALSE(call_1_.active());
170   EXPECT_TRUE(call_2_.active());
171 
172   callback_thread_.join();
173 }
174 
TEST_F(CallbacksTest,MoveOtherCallIntoOwnCallInCallback)175 TEST_F(CallbacksTest, MoveOtherCallIntoOwnCallInCallback) {
176   call_1_ = TestService::TestBidirectionalStreamRpc(
177       context_.client(),
178       context_.channel().id(),
179       [this](const pw_rpc_test_TestStreamResponse&) {
180         main_thread_sem_.release();  // Confirm that this thread started
181 
182         call_1_ = std::move(call_2_);
183 
184         callback_executed_ = callback_executed_ + 1;
185       });
186 
187   call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
188                                                     context_.channel().id());
189 
190   EXPECT_TRUE(call_1_.active());
191   EXPECT_TRUE(call_2_.active());
192 
193   // Start the callback thread and wait for it to finish.
194   callback_thread_sem_.release();
195   callback_thread_.join();
196 
197   EXPECT_EQ(callback_executed_, 1);
198   EXPECT_TRUE(call_1_.active());
199   EXPECT_FALSE(call_2_.active());
200 }
201 
TEST_F(CallbacksTest,MoveOwnCallInCallback)202 TEST_F(CallbacksTest, MoveOwnCallInCallback) {
203   call_1_ = TestService::TestBidirectionalStreamRpc(
204       context_.client(),
205       context_.channel().id(),
206       [this](const pw_rpc_test_TestStreamResponse&) {
207         main_thread_sem_.release();  // Confirm that this thread started
208 
209         // Cancel this call first, or the move will deadlock, since the moving
210         // thread will wait for the callback thread (both this thread) to
211         // terminate if the call is active.
212         EXPECT_EQ(OkStatus(), call_1_.Cancel());
213         call_2_ = std::move(call_1_);
214 
215         callback_executed_ = callback_executed_ + 1;
216       });
217 
218   call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
219                                                     context_.channel().id());
220 
221   EXPECT_TRUE(call_1_.active());
222   EXPECT_TRUE(call_2_.active());
223 
224   // Start the callback thread and wait for it to finish.
225   callback_thread_sem_.release();
226   callback_thread_.join();
227 
228   EXPECT_EQ(callback_executed_, 1);
229   EXPECT_FALSE(call_1_.active());
230   EXPECT_FALSE(call_2_.active());
231 }
232 
TEST_F(CallbacksTest,PacketDroppedIfOnNextIsBusy)233 TEST_F(CallbacksTest, PacketDroppedIfOnNextIsBusy) {
234   call_1_ = TestService::TestBidirectionalStreamRpc(
235       context_.client(),
236       context_.channel().id(),
237       [this](const pw_rpc_test_TestStreamResponse&) {
238         main_thread_sem_.release();  // Confirm that this thread started
239 
240         callback_thread_sem_.acquire();  // Wait for the main thread to release
241 
242         callback_executed_ = callback_executed_ + 1;
243       });
244 
245   // Start the callback thread.
246   callback_thread_sem_.release();
247 
248   main_thread_sem_.acquire();  // Confirm that the callback is running
249 
250   // Handle a few packets for this call, which should be dropped since on_next
251   // is busy. callback_executed_ should remain at 1.
252   for (int i = 0; i < 5; ++i) {
253     context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
254         {}, call_1_.id());
255   }
256 
257   // Wait for the callback thread to finish.
258   callback_thread_sem_.release();
259   callback_thread_.join();
260 
261   EXPECT_EQ(callback_executed_, 1);
262 }
263 
264 }  // namespace
265 }  // namespace pw::rpc
266