1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/raw/client_testing.h"
16 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
17 #include "pw_sync/binary_semaphore.h"
18 #include "pw_thread/non_portable_test_thread_options.h"
19 #include "pw_thread/sleep.h"
20 #include "pw_thread/thread.h"
21 #include "pw_thread/yield.h"
22 #include "pw_unit_test/framework.h"
23
24 namespace pw::rpc {
25 namespace {
26
27 using namespace std::chrono_literals;
28
29 using test::pw_rpc::raw::TestService;
30
31 // These tests cover interactions between a thread moving or destroying an RPC
32 // call object and a thread running callbacks for that call. In order to test
33 // that the first thread waits for callbacks to complete when trying to move or
34 // destroy the call, it is necessary to have the callback thread yield to the
35 // other thread. There isn't a good way to synchronize these threads without
36 // changing the code under test.
YieldToOtherThread()37 void YieldToOtherThread() {
38 // Sleep for a while and then yield just to be sure the other thread runs.
39 this_thread::sleep_for(100ms);
40 this_thread::yield();
41 }
42
43 class CallbacksTest : public ::testing::Test {
44 protected:
CallbacksTest()45 CallbacksTest()
46 // TODO: b/290860904 - Replace TestOptionsThread0 with
47 // TestThreadContext.
48 : callback_thread_(thread::test::TestOptionsThread0(),
49 [this] { SendResponseAfterSemaphore(); }) {}
50
~CallbacksTest()51 ~CallbacksTest() override {
52 EXPECT_FALSE(callback_thread_.joinable()); // Tests must join the thread!
53 }
54
RespondToCall(const RawClientReaderWriter & call)55 void RespondToCall(const RawClientReaderWriter& call) {
56 respond_to_call_ = &call;
57 }
58
59 RawClientTestContext<> context_;
60 sync::BinarySemaphore callback_thread_sem_;
61 sync::BinarySemaphore main_thread_sem_;
62
63 Thread callback_thread_;
64
65 // Must be incremented exactly once by the RPC callback in each test.
66 volatile int callback_executed_ = 0;
67
68 // Variables optionally used by tests. These are in this object so lambads
69 // only need to capture [this] to access them.
70 volatile bool call_is_in_scope_ = false;
71
72 RawClientReaderWriter call_1_;
73 RawClientReaderWriter call_2_;
74
75 private:
SendResponseAfterSemaphore()76 void SendResponseAfterSemaphore() {
77 // Wait until the main thread says to send the response.
78 callback_thread_sem_.acquire();
79
80 context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
81 {}, respond_to_call_->id());
82 }
83
84 const RawClientReaderWriter* respond_to_call_ = &call_1_;
85 };
86
TEST_F(CallbacksTest,DestructorWaitsUntilCallbacksComplete)87 TEST_F(CallbacksTest, DestructorWaitsUntilCallbacksComplete) {
88 if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
89 callback_thread_sem_.release();
90 callback_thread_.join();
91 GTEST_SKIP()
92 << "Skipping because locks are disabled, so this thread cannot yield.";
93 }
94
95 {
96 RawClientReaderWriter local_call = TestService::TestBidirectionalStreamRpc(
97 context_.client(), context_.channel().id());
98 RespondToCall(local_call);
99
100 call_is_in_scope_ = true;
101
102 local_call.set_on_next([this](ConstByteSpan) {
103 main_thread_sem_.release();
104
105 // Wait for a while so the main thread tries to destroy the call.
106 YieldToOtherThread();
107
108 // Now, make sure the call is still in scope. The main thread should
109 // block in the call's destructor until this callback completes.
110 EXPECT_TRUE(call_is_in_scope_);
111
112 callback_executed_ = callback_executed_ + 1;
113 });
114
115 // Start the callback thread so it can invoke the callback.
116 callback_thread_sem_.release();
117
118 // Wait until the callback thread starts.
119 main_thread_sem_.acquire();
120 }
121
122 // The callback thread will sleep for a bit. Meanwhile, let the call go out
123 // of scope, and mark it as such.
124 call_is_in_scope_ = false;
125
126 // Wait for the callback thread to finish.
127 callback_thread_.join();
128
129 EXPECT_EQ(callback_executed_, 1);
130 }
131
TEST_F(CallbacksTest,MoveActiveCall_WaitsForCallbackToComplete)132 TEST_F(CallbacksTest, MoveActiveCall_WaitsForCallbackToComplete) {
133 if (PW_RPC_USE_GLOBAL_MUTEX == 0) {
134 callback_thread_sem_.release();
135 callback_thread_.join();
136 GTEST_SKIP()
137 << "Skipping because locks are disabled, so this thread cannot yield.";
138 }
139
140 call_1_ = TestService::TestBidirectionalStreamRpc(
141 context_.client(), context_.channel().id(), [this](ConstByteSpan) {
142 main_thread_sem_.release(); // Confirm that this thread started
143
144 YieldToOtherThread();
145
146 callback_executed_ = callback_executed_ + 1;
147 });
148
149 // Start the callback thread so it can invoke the callback.
150 callback_thread_sem_.release();
151
152 // Confirm that the callback thread started.
153 main_thread_sem_.acquire();
154
155 // Move the call object. This thread should wait until the on_completed
156 // callback is done.
157 EXPECT_TRUE(call_1_.active());
158 call_2_ = std::move(call_1_);
159
160 // The callback should already have finished. This thread should have waited
161 // for it to finish during the move.
162 EXPECT_EQ(callback_executed_, 1);
163 EXPECT_FALSE(call_1_.active());
164 EXPECT_TRUE(call_2_.active());
165
166 callback_thread_.join();
167 }
168
TEST_F(CallbacksTest,MoveOtherCallIntoOwnCallInCallback)169 TEST_F(CallbacksTest, MoveOtherCallIntoOwnCallInCallback) {
170 call_1_ = TestService::TestBidirectionalStreamRpc(
171 context_.client(), context_.channel().id(), [this](ConstByteSpan) {
172 main_thread_sem_.release(); // Confirm that this thread started
173
174 call_1_ = std::move(call_2_);
175
176 callback_executed_ = callback_executed_ + 1;
177 });
178
179 call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
180 context_.channel().id());
181
182 EXPECT_TRUE(call_1_.active());
183 EXPECT_TRUE(call_2_.active());
184
185 // Start the callback thread and wait for it to finish.
186 callback_thread_sem_.release();
187 callback_thread_.join();
188
189 EXPECT_EQ(callback_executed_, 1);
190 EXPECT_TRUE(call_1_.active());
191 EXPECT_FALSE(call_2_.active());
192 }
193
TEST_F(CallbacksTest,MoveOwnCallInCallback)194 TEST_F(CallbacksTest, MoveOwnCallInCallback) {
195 call_1_ = TestService::TestBidirectionalStreamRpc(
196 context_.client(), context_.channel().id(), [this](ConstByteSpan) {
197 main_thread_sem_.release(); // Confirm that this thread started
198
199 // Cancel this call first, or the move will deadlock, since the moving
200 // thread will wait for the callback thread (both this thread) to
201 // terminate if the call is active.
202 EXPECT_EQ(OkStatus(), call_1_.Cancel());
203 call_2_ = std::move(call_1_);
204
205 callback_executed_ = callback_executed_ + 1;
206 });
207
208 call_2_ = TestService::TestBidirectionalStreamRpc(context_.client(),
209 context_.channel().id());
210
211 EXPECT_TRUE(call_1_.active());
212 EXPECT_TRUE(call_2_.active());
213
214 // Start the callback thread and wait for it to finish.
215 callback_thread_sem_.release();
216 callback_thread_.join();
217
218 EXPECT_EQ(callback_executed_, 1);
219 EXPECT_FALSE(call_1_.active());
220 EXPECT_FALSE(call_2_.active());
221 }
222
TEST_F(CallbacksTest,PacketDroppedIfOnNextIsBusy)223 TEST_F(CallbacksTest, PacketDroppedIfOnNextIsBusy) {
224 call_1_ = TestService::TestBidirectionalStreamRpc(
225 context_.client(), context_.channel().id(), [this](ConstByteSpan) {
226 main_thread_sem_.release(); // Confirm that this thread started
227
228 callback_thread_sem_.acquire(); // Wait for the main thread to release
229
230 callback_executed_ = callback_executed_ + 1;
231 });
232
233 // Start the callback thread.
234 callback_thread_sem_.release();
235
236 main_thread_sem_.acquire(); // Confirm that the callback is running
237
238 // Handle a few packets for this call, which should be dropped since on_next
239 // is busy. callback_executed_ should remain at 1.
240 for (int i = 0; i < 5; ++i) {
241 context_.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
242 {}, call_1_.id());
243 }
244
245 // Wait for the callback thread to finish.
246 callback_thread_sem_.release();
247 callback_thread_.join();
248
249 EXPECT_EQ(callback_executed_, 1);
250 }
251
252 } // namespace
253 } // namespace pw::rpc
254