xref: /aosp_15_r20/external/pigweed/pw_rpc/callback_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/raw/client_testing.h"
16 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
17 #include "pw_sync/binary_semaphore.h"
18 #include "pw_thread/non_portable_test_thread_options.h"
19 #include "pw_thread/sleep.h"
20 #include "pw_thread/thread.h"
21 #include "pw_thread/yield.h"
22 #include "pw_unit_test/framework.h"
23 
24 namespace pw::rpc {
25 namespace {
26 
27 using namespace std::chrono_literals;
28 
29 using test::pw_rpc::raw::TestService;
30 
31 // These tests cover interactions between a thread moving or destroying an RPC
32 // call object and a thread running callbacks for that call. In order to test
33 // that the first thread waits for callbacks to complete when trying to move or
34 // destroy the call, it is necessary to have the callback thread yield to the
35 // other thread. There isn't a good way to synchronize these threads without
36 // changing the code under test.
YieldToOtherThread()37 void YieldToOtherThread() {
38   // Sleep for a while and then yield just to be sure the other thread runs.
39   this_thread::sleep_for(100ms);
40   this_thread::yield();
41 }
42 
43 class CallbacksTest : public ::testing::Test {
44  protected:
CallbacksTest()45   CallbacksTest()
46       // TODO: b/290860904 - Replace TestOptionsThread0 with
47       // TestThreadContext.
48       : callback_thread_(thread::test::TestOptionsThread0(),
49                          [this] { SendResponseAfterSemaphore(); }) {}
50 
~CallbacksTest()51   ~CallbacksTest() override {
52     EXPECT_FALSE(callback_thread_.joinable());  // Tests must join the thread!
53   }
54 
RespondToCall(const RawClientReaderWriter & call)55   void RespondToCall(const RawClientReaderWriter& call) {
56     respond_to_call_ = &call;
57   }
58 
59   RawClientTestContext<> context_;
60   sync::BinarySemaphore callback_thread_sem_;
61   sync::BinarySemaphore main_thread_sem_;
62 
63   Thread callback_thread_;
64 
65   // Must be incremented exactly once by the RPC callback in each test.
66   volatile int callback_executed_ = 0;
67 
68   // Variables optionally used by tests. These are in this object so lambads
69   // only need to capture [this] to access them.
70   volatile bool call_is_in_scope_ = false;
71 
72   RawClientReaderWriter call_1_;
73   RawClientReaderWriter call_2_;
74 
75  private:
SendResponseAfterSemaphore()76   void SendResponseAfterSemaphore() {
77     // Wait until the main thread says to send the response.
78     callback_thread_sem_.acquire();
79 
80     context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
81         {}, respond_to_call_->id());
82   }
83 
84   const RawClientReaderWriter* respond_to_call_ = &call_1_;
85 };
86 
TEST_F(CallbacksTest,DestructorWaitsUntilCallbacksComplete)87 TEST_F(CallbacksTest, DestructorWaitsUntilCallbacksComplete) {
88   if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
89     callback_thread_sem_.release();
90     callback_thread_.join();
91     GTEST_SKIP()
92         << "Skipping because locks are disabled, so this thread cannot yield.";
93   }
94 
95   {
96     RawClientReaderWriter local_call = TestService::TestBidirectionalStreamRpc(
97         context_.client(), context_.channel().id());
98     RespondToCall(local_call);
99 
100     call_is_in_scope_ = true;
101 
102     local_call.set_on_next([this](ConstByteSpan) {
103       main_thread_sem_.release();
104 
105       // Wait for a while so the main thread tries to destroy the call.
106       YieldToOtherThread();
107 
108       // Now, make sure the call is still in scope. The main thread should
109       // block in the call's destructor until this callback completes.
110       EXPECT_TRUE(call_is_in_scope_);
111 
112       callback_executed_ = callback_executed_ + 1;
113     });
114 
115     // Start the callback thread so it can invoke the callback.
116     callback_thread_sem_.release();
117 
118     // Wait until the callback thread starts.
119     main_thread_sem_.acquire();
120   }
121 
122   // The callback thread will sleep for a bit. Meanwhile, let the call go out
123   // of scope, and mark it as such.
124   call_is_in_scope_ = false;
125 
126   // Wait for the callback thread to finish.
127   callback_thread_.join();
128 
129   EXPECT_EQ(callback_executed_, 1);
130 }
131 
TEST_F(CallbacksTest,MoveActiveCall_WaitsForCallbackToComplete)132 TEST_F(CallbacksTest, MoveActiveCall_WaitsForCallbackToComplete) {
133   if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
134     callback_thread_sem_.release();
135     callback_thread_.join();
136     GTEST_SKIP()
137         << "Skipping because locks are disabled, so this thread cannot yield.";
138   }
139 
140   call_1_ = TestService::TestBidirectionalStreamRpc(
141       context_.client(), context_.channel().id(), [this](ConstByteSpan) {
142         main_thread_sem_.release();  // Confirm that this thread started
143 
144         YieldToOtherThread();
145 
146         callback_executed_ = callback_executed_ + 1;
147       });
148 
149   // Start the callback thread so it can invoke the callback.
150   callback_thread_sem_.release();
151 
152   // Confirm that the callback thread started.
153   main_thread_sem_.acquire();
154 
155   // Move the call object. This thread should wait until the on_completed
156   // callback is done.
157   EXPECT_TRUE(call_1_.active());
158   call_2_ = std::move(call_1_);
159 
160   // The callback should already have finished. This thread should have waited
161   // for it to finish during the move.
162   EXPECT_EQ(callback_executed_, 1);
163   EXPECT_FALSE(call_1_.active());
164   EXPECT_TRUE(call_2_.active());
165 
166   callback_thread_.join();
167 }
168 
TEST_F(CallbacksTest,MoveOtherCallIntoOwnCallInCallback)169 TEST_F(CallbacksTest, MoveOtherCallIntoOwnCallInCallback) {
170   call_1_ = TestService::TestBidirectionalStreamRpc(
171       context_.client(), context_.channel().id(), [this](ConstByteSpan) {
172         main_thread_sem_.release();  // Confirm that this thread started
173 
174         call_1_ = std::move(call_2_);
175 
176         callback_executed_ = callback_executed_ + 1;
177       });
178 
179   call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
180                                                     context_.channel().id());
181 
182   EXPECT_TRUE(call_1_.active());
183   EXPECT_TRUE(call_2_.active());
184 
185   // Start the callback thread and wait for it to finish.
186   callback_thread_sem_.release();
187   callback_thread_.join();
188 
189   EXPECT_EQ(callback_executed_, 1);
190   EXPECT_TRUE(call_1_.active());
191   EXPECT_FALSE(call_2_.active());
192 }
193 
TEST_F(CallbacksTest,MoveOwnCallInCallback)194 TEST_F(CallbacksTest, MoveOwnCallInCallback) {
195   call_1_ = TestService::TestBidirectionalStreamRpc(
196       context_.client(), context_.channel().id(), [this](ConstByteSpan) {
197         main_thread_sem_.release();  // Confirm that this thread started
198 
199         // Cancel this call first, or the move will deadlock, since the moving
200         // thread will wait for the callback thread (both this thread) to
201         // terminate if the call is active.
202         EXPECT_EQ(OkStatus(), call_1_.Cancel());
203         call_2_ = std::move(call_1_);
204 
205         callback_executed_ = callback_executed_ + 1;
206       });
207 
208   call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
209                                                     context_.channel().id());
210 
211   EXPECT_TRUE(call_1_.active());
212   EXPECT_TRUE(call_2_.active());
213 
214   // Start the callback thread and wait for it to finish.
215   callback_thread_sem_.release();
216   callback_thread_.join();
217 
218   EXPECT_EQ(callback_executed_, 1);
219   EXPECT_FALSE(call_1_.active());
220   EXPECT_FALSE(call_2_.active());
221 }
222 
TEST_F(CallbacksTest,PacketDroppedIfOnNextIsBusy)223 TEST_F(CallbacksTest, PacketDroppedIfOnNextIsBusy) {
224   call_1_ = TestService::TestBidirectionalStreamRpc(
225       context_.client(), context_.channel().id(), [this](ConstByteSpan) {
226         main_thread_sem_.release();  // Confirm that this thread started
227 
228         callback_thread_sem_.acquire();  // Wait for the main thread to release
229 
230         callback_executed_ = callback_executed_ + 1;
231       });
232 
233   // Start the callback thread.
234   callback_thread_sem_.release();
235 
236   main_thread_sem_.acquire();  // Confirm that the callback is running
237 
238   // Handle a few packets for this call, which should be dropped since on_next
239   // is busy. callback_executed_ should remain at 1.
240   for (int i = 0; i < 5; ++i) {
241     context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
242         {}, call_1_.id());
243   }
244 
245   // Wait for the callback thread to finish.
246   callback_thread_sem_.release();
247   callback_thread_.join();
248 
249   EXPECT_EQ(callback_executed_, 1);
250 }
251 
252 }  // namespace
253 }  // namespace pw::rpc
254