1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/nanopb/client_testing.h"
16 #include "pw_rpc_test_protos/test.rpc.pb.h"
17 #include "pw_sync/binary_semaphore.h"
18 #include "pw_thread/non_portable_test_thread_options.h"
19 #include "pw_thread/sleep.h"
20 #include "pw_thread/thread.h"
21 #include "pw_thread/yield.h"
22 #include "pw_unit_test/framework.h"
23
24 namespace pw::rpc {
25 namespace {
26
27 using namespace std::chrono_literals;
28
29 using test::pw_rpc::nanopb::TestService;
30
31 using ClientReaderWriter =
32 NanopbClientReaderWriter<pw_rpc_test_TestRequest,
33 pw_rpc_test_TestStreamResponse>;
34
35 // These tests cover interactions between a thread moving or destroying an RPC
36 // call object and a thread running callbacks for that call. In order to test
37 // that the first thread waits for callbacks to complete when trying to move or
38 // destroy the call, it is necessary to have the callback thread yield to the
39 // other thread. There isn't a good way to synchronize these threads without
40 // changing the code under test.
YieldToOtherThread()41 void YieldToOtherThread() {
42 // Sleep for a while and then yield just to be sure the other thread runs.
43 this_thread::sleep_for(100ms);
44 this_thread::yield();
45 }
46
47 class CallbacksTest : public ::testing::Test {
48 protected:
CallbacksTest()49 CallbacksTest()
50 // TODO: b/290860904 - Replace TestOptionsThread0 with
51 // TestThreadContext.
52 : callback_thread_(thread::test::TestOptionsThread0(),
53 [this] { SendResponseAfterSemaphore(); }) {}
54
~CallbacksTest()55 ~CallbacksTest() override {
56 EXPECT_FALSE(callback_thread_.joinable()); // Tests must join the thread!
57 }
58
RespondToCall(const ClientReaderWriter & call)59 void RespondToCall(const ClientReaderWriter& call) {
60 respond_to_call_ = &call;
61 }
62
63 NanopbClientTestContext<> context_;
64 sync::BinarySemaphore callback_thread_sem_;
65 sync::BinarySemaphore main_thread_sem_;
66
67 Thread callback_thread_;
68
69 // Must be incremented exactly once by the RPC callback in each test.
70 volatile int callback_executed_ = 0;
71
72 // Variables optionally used by tests. These are in this object so lambads
73 // only need to capture [this] to access them.
74 volatile bool call_is_in_scope_ = false;
75
76 ClientReaderWriter call_1_;
77 ClientReaderWriter call_2_;
78
79 private:
SendResponseAfterSemaphore()80 void SendResponseAfterSemaphore() {
81 // Wait until the main thread says to send the response.
82 callback_thread_sem_.acquire();
83
84 context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
85 {}, respond_to_call_->id());
86 }
87
88 const ClientReaderWriter* respond_to_call_ = &call_1_;
89 };
90
TEST_F(CallbacksTest,DestructorWaitsUntilCallbacksComplete)91 TEST_F(CallbacksTest, DestructorWaitsUntilCallbacksComplete) {
92 // Skip this test if locks are disabled because the thread can't yield.
93 if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
94 callback_thread_sem_.release();
95 callback_thread_.join();
96 GTEST_SKIP();
97 }
98
99 {
100 ClientReaderWriter local_call = TestService::TestBidirectionalStreamRpc(
101 context_.client(), context_.channel().id());
102 RespondToCall(local_call);
103
104 call_is_in_scope_ = true;
105
106 local_call.set_on_next([this](const pw_rpc_test_TestStreamResponse&) {
107 main_thread_sem_.release();
108
109 // Wait for a while so the main thread tries to destroy the call.
110 YieldToOtherThread();
111
112 // Now, make sure the call is still in scope. The main thread should
113 // block in the call's destructor until this callback completes.
114 EXPECT_TRUE(call_is_in_scope_);
115
116 callback_executed_ = callback_executed_ + 1;
117 });
118
119 // Start the callback thread so it can invoke the callback.
120 callback_thread_sem_.release();
121
122 // Wait until the callback thread starts.
123 main_thread_sem_.acquire();
124 }
125
126 // The callback thread will sleep for a bit. Meanwhile, let the call go out
127 // of scope, and mark it as such.
128 call_is_in_scope_ = false;
129
130 // Wait for the callback thread to finish.
131 callback_thread_.join();
132
133 EXPECT_EQ(callback_executed_, 1);
134 }
135
TEST_F(CallbacksTest,MoveActiveCall_WaitsForCallbackToComplete)136 TEST_F(CallbacksTest, MoveActiveCall_WaitsForCallbackToComplete) {
137 // Skip this test if locks are disabled because the thread can't yield.
138 if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
139 callback_thread_sem_.release();
140 callback_thread_.join();
141 GTEST_SKIP();
142 }
143
144 call_1_ = TestService::TestBidirectionalStreamRpc(
145 context_.client(),
146 context_.channel().id(),
147 [this](const pw_rpc_test_TestStreamResponse&) {
148 main_thread_sem_.release(); // Confirm that this thread started
149
150 YieldToOtherThread();
151
152 callback_executed_ = callback_executed_ + 1;
153 });
154
155 // Start the callback thread so it can invoke the callback.
156 callback_thread_sem_.release();
157
158 // Confirm that the callback thread started.
159 main_thread_sem_.acquire();
160
161 // Move the call object. This thread should wait until the on_completed
162 // callback is done.
163 EXPECT_TRUE(call_1_.active());
164 call_2_ = std::move(call_1_);
165
166 // The callback should already have finished. This thread should have waited
167 // for it to finish during the move.
168 EXPECT_EQ(callback_executed_, 1);
169 EXPECT_FALSE(call_1_.active());
170 EXPECT_TRUE(call_2_.active());
171
172 callback_thread_.join();
173 }
174
TEST_F(CallbacksTest,MoveOtherCallIntoOwnCallInCallback)175 TEST_F(CallbacksTest, MoveOtherCallIntoOwnCallInCallback) {
176 call_1_ = TestService::TestBidirectionalStreamRpc(
177 context_.client(),
178 context_.channel().id(),
179 [this](const pw_rpc_test_TestStreamResponse&) {
180 main_thread_sem_.release(); // Confirm that this thread started
181
182 call_1_ = std::move(call_2_);
183
184 callback_executed_ = callback_executed_ + 1;
185 });
186
187 call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
188 context_.channel().id());
189
190 EXPECT_TRUE(call_1_.active());
191 EXPECT_TRUE(call_2_.active());
192
193 // Start the callback thread and wait for it to finish.
194 callback_thread_sem_.release();
195 callback_thread_.join();
196
197 EXPECT_EQ(callback_executed_, 1);
198 EXPECT_TRUE(call_1_.active());
199 EXPECT_FALSE(call_2_.active());
200 }
201
TEST_F(CallbacksTest,MoveOwnCallInCallback)202 TEST_F(CallbacksTest, MoveOwnCallInCallback) {
203 call_1_ = TestService::TestBidirectionalStreamRpc(
204 context_.client(),
205 context_.channel().id(),
206 [this](const pw_rpc_test_TestStreamResponse&) {
207 main_thread_sem_.release(); // Confirm that this thread started
208
209 // Cancel this call first, or the move will deadlock, since the moving
210 // thread will wait for the callback thread (both this thread) to
211 // terminate if the call is active.
212 EXPECT_EQ(OkStatus(), call_1_.Cancel());
213 call_2_ = std::move(call_1_);
214
215 callback_executed_ = callback_executed_ + 1;
216 });
217
218 call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
219 context_.channel().id());
220
221 EXPECT_TRUE(call_1_.active());
222 EXPECT_TRUE(call_2_.active());
223
224 // Start the callback thread and wait for it to finish.
225 callback_thread_sem_.release();
226 callback_thread_.join();
227
228 EXPECT_EQ(callback_executed_, 1);
229 EXPECT_FALSE(call_1_.active());
230 EXPECT_FALSE(call_2_.active());
231 }
232
TEST_F(CallbacksTest,PacketDroppedIfOnNextIsBusy)233 TEST_F(CallbacksTest, PacketDroppedIfOnNextIsBusy) {
234 call_1_ = TestService::TestBidirectionalStreamRpc(
235 context_.client(),
236 context_.channel().id(),
237 [this](const pw_rpc_test_TestStreamResponse&) {
238 main_thread_sem_.release(); // Confirm that this thread started
239
240 callback_thread_sem_.acquire(); // Wait for the main thread to release
241
242 callback_executed_ = callback_executed_ + 1;
243 });
244
245 // Start the callback thread.
246 callback_thread_sem_.release();
247
248 main_thread_sem_.acquire(); // Confirm that the callback is running
249
250 // Handle a few packets for this call, which should be dropped since on_next
251 // is busy. callback_executed_ should remain at 1.
252 for (int i = 0; i < 5; ++i) {
253 context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
254 {}, call_1_.id());
255 }
256
257 // Wait for the callback thread to finish.
258 callback_thread_sem_.release();
259 callback_thread_.join();
260
261 EXPECT_EQ(callback_executed_, 1);
262 }
263
264 } // namespace
265 } // namespace pw::rpc
266