1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include <mutex>
16 #include <utility>
17
18 #include "pw_rpc/pwpb/client_server_testing.h"
19 #include "pw_rpc_test_protos/test.rpc.pwpb.h"
20 #include "pw_sync/mutex.h"
21 #include "pw_unit_test/framework.h"
22
23 namespace pw::rpc {
24 namespace {
25
26 namespace TestRequest = ::pw::rpc::test::pwpb::TestRequest;
27 namespace TestResponse = ::pw::rpc::test::pwpb::TestResponse;
28 namespace TestStreamResponse = ::pw::rpc::test::pwpb::TestStreamResponse;
29
30 } // namespace
31
32 namespace test {
33
34 using GeneratedService = ::pw::rpc::test::pw_rpc::pwpb::TestService;
35
36 class TestService final : public GeneratedService::Service<TestService> {
37 public:
TestUnaryRpc(const TestRequest::Message & request,TestResponse::Message & response)38 Status TestUnaryRpc(const TestRequest::Message& request,
39 TestResponse::Message& response) {
40 response.value = request.integer + 1;
41 return static_cast<Status::Code>(request.status_code);
42 }
43
TestAnotherUnaryRpc(const TestRequest::Message & request,TestResponse::Message & response)44 Status TestAnotherUnaryRpc(const TestRequest::Message& request,
45 TestResponse::Message& response) {
46 response.value = 42;
47 response.repeated_field.SetEncoder(
48 [](TestResponse::StreamEncoder& encoder) {
49 constexpr std::array<uint32_t, 3> kValues = {7, 8, 9};
50 return encoder.WriteRepeatedField(kValues);
51 });
52 return static_cast<Status::Code>(request.status_code);
53 }
54
TestServerStreamRpc(const TestRequest::Message &,ServerWriter<TestStreamResponse::Message> &)55 static void TestServerStreamRpc(const TestRequest::Message&,
56 ServerWriter<TestStreamResponse::Message>&) {}
57
TestClientStreamRpc(ServerReader<TestRequest::Message,TestStreamResponse::Message> &)58 void TestClientStreamRpc(
59 ServerReader<TestRequest::Message, TestStreamResponse::Message>&) {}
60
TestBidirectionalStreamRpc(ServerReaderWriter<TestRequest::Message,TestStreamResponse::Message> &)61 void TestBidirectionalStreamRpc(
62 ServerReaderWriter<TestRequest::Message, TestStreamResponse::Message>&) {}
63 };
64
65 } // namespace test
66
67 namespace {
68
TEST(PwpbClientServerTestContext,ReceivesUnaryRpcResponse)69 TEST(PwpbClientServerTestContext, ReceivesUnaryRpcResponse) {
70 PwpbClientServerTestContext<> ctx;
71 test::TestService service;
72 ctx.server().RegisterService(service);
73
74 TestResponse::Message response = {};
75 auto handler = [&response](const TestResponse::Message& server_response,
76 pw::Status) { response = server_response; };
77
78 TestRequest::Message request{.integer = 1, .status_code = OkStatus().code()};
79 auto call = test::GeneratedService::TestUnaryRpc(
80 ctx.client(), ctx.channel().id(), request, handler);
81 // Force manual forwarding of packets as context is not threaded
82 ctx.ForwardNewPackets();
83
84 const auto sent_request =
85 ctx.request<test::pw_rpc::pwpb::TestService::TestUnaryRpc>(0);
86 const auto sent_response =
87 ctx.response<test::pw_rpc::pwpb::TestService::TestUnaryRpc>(0);
88
89 EXPECT_EQ(response.value, sent_response.value);
90 EXPECT_EQ(response.value, request.integer + 1);
91 EXPECT_EQ(request.integer, sent_request.integer);
92 }
93
TEST(PwpbClientServerTestContext,ReceivesMultipleResponses)94 TEST(PwpbClientServerTestContext, ReceivesMultipleResponses) {
95 PwpbClientServerTestContext<> ctx;
96 test::TestService service;
97 ctx.server().RegisterService(service);
98
99 TestResponse::Message response1 = {};
100 TestResponse::Message response2 = {};
101 auto handler1 = [&response1](const TestResponse::Message& server_response,
102 pw::Status) { response1 = server_response; };
103 auto handler2 = [&response2](const TestResponse::Message& server_response,
104 pw::Status) { response2 = server_response; };
105
106 TestRequest::Message request1{.integer = 1, .status_code = OkStatus().code()};
107 TestRequest::Message request2{.integer = 2, .status_code = OkStatus().code()};
108 const auto call1 = test::GeneratedService::TestUnaryRpc(
109 ctx.client(), ctx.channel().id(), request1, handler1);
110 // Force manual forwarding of packets as context is not threaded
111 ctx.ForwardNewPackets();
112 const auto call2 = test::GeneratedService::TestUnaryRpc(
113 ctx.client(), ctx.channel().id(), request2, handler2);
114 // Force manual forwarding of packets as context is not threaded
115 ctx.ForwardNewPackets();
116
117 const auto sent_request1 =
118 ctx.request<test::pw_rpc::pwpb::TestService::TestUnaryRpc>(0);
119 const auto sent_request2 =
120 ctx.request<test::pw_rpc::pwpb::TestService::TestUnaryRpc>(1);
121 const auto sent_response1 =
122 ctx.response<test::pw_rpc::pwpb::TestService::TestUnaryRpc>(0);
123 const auto sent_response2 =
124 ctx.response<test::pw_rpc::pwpb::TestService::TestUnaryRpc>(1);
125
126 EXPECT_EQ(response1.value, request1.integer + 1);
127 EXPECT_EQ(response2.value, request2.integer + 1);
128 EXPECT_EQ(response1.value, sent_response1.value);
129 EXPECT_EQ(response2.value, sent_response2.value);
130 EXPECT_EQ(request1.integer, sent_request1.integer);
131 EXPECT_EQ(request2.integer, sent_request2.integer);
132 }
133
TEST(PwpbClientServerTestContext,ReceivesMultipleResponsesWithPacketProcessor)134 TEST(PwpbClientServerTestContext,
135 ReceivesMultipleResponsesWithPacketProcessor) {
136 using ProtectedInt = std::pair<int, pw::sync::Mutex>;
137 ProtectedInt server_counter{};
138 auto server_processor = [&server_counter](
139 ClientServer& client_server,
140 pw::ConstByteSpan packet) -> pw::Status {
141 server_counter.second.lock();
142 ++server_counter.first;
143 server_counter.second.unlock();
144 return client_server.ProcessPacket(packet);
145 };
146
147 ProtectedInt client_counter{};
148 auto client_processor = [&client_counter](
149 ClientServer& client_server,
150 pw::ConstByteSpan packet) -> pw::Status {
151 client_counter.second.lock();
152 ++client_counter.first;
153 client_counter.second.unlock();
154 return client_server.ProcessPacket(packet);
155 };
156
157 PwpbClientServerTestContext<> ctx(server_processor, client_processor);
158 test::TestService service;
159 ctx.server().RegisterService(service);
160
161 TestResponse::Message response1 = {};
162 TestResponse::Message response2 = {};
163 auto handler1 = [&response1](const TestResponse::Message& server_response,
164 pw::Status) { response1 = server_response; };
165 auto handler2 = [&response2](const TestResponse::Message& server_response,
166 pw::Status) { response2 = server_response; };
167
168 TestRequest::Message request1{.integer = 1, .status_code = OkStatus().code()};
169 TestRequest::Message request2{.integer = 2, .status_code = OkStatus().code()};
170 const auto call1 = test::GeneratedService::TestUnaryRpc(
171 ctx.client(), ctx.channel().id(), request1, handler1);
172 // Force manual forwarding of packets as context is not threaded
173 ctx.ForwardNewPackets();
174 const auto call2 = test::GeneratedService::TestUnaryRpc(
175 ctx.client(), ctx.channel().id(), request2, handler2);
176 // Force manual forwarding of packets as context is not threaded
177 ctx.ForwardNewPackets();
178
179 const auto sent_request1 =
180 ctx.request<test::pw_rpc::pwpb::TestService::TestUnaryRpc>(0);
181 const auto sent_request2 =
182 ctx.request<test::pw_rpc::pwpb::TestService::TestUnaryRpc>(1);
183 const auto sent_response1 =
184 ctx.response<test::pw_rpc::pwpb::TestService::TestUnaryRpc>(0);
185 const auto sent_response2 =
186 ctx.response<test::pw_rpc::pwpb::TestService::TestUnaryRpc>(1);
187
188 EXPECT_EQ(response1.value, request1.integer + 1);
189 EXPECT_EQ(response2.value, request2.integer + 1);
190 EXPECT_EQ(response1.value, sent_response1.value);
191 EXPECT_EQ(response2.value, sent_response2.value);
192 EXPECT_EQ(request1.integer, sent_request1.integer);
193 EXPECT_EQ(request2.integer, sent_request2.integer);
194
195 server_counter.second.lock();
196 EXPECT_EQ(server_counter.first, 2);
197 server_counter.second.unlock();
198 client_counter.second.lock();
199 EXPECT_EQ(client_counter.first, 2);
200 client_counter.second.unlock();
201 }
202
TEST(PwpbClientServerTestContext,ResponseWithCallbacks)203 TEST(PwpbClientServerTestContext, ResponseWithCallbacks) {
204 PwpbClientServerTestContext<> ctx;
205 test::TestService service;
206 ctx.server().RegisterService(service);
207
208 TestRequest::Message request{};
209 const auto call = test::GeneratedService::TestAnotherUnaryRpc(
210 ctx.client(), ctx.channel().id(), request);
211 // Force manual forwarding of packets as context is not threaded
212 ctx.ForwardNewPackets();
213
214 // To decode a response object that requires to set callbacks, pass it to the
215 // response() method as a parameter.
216 pw::Vector<uint32_t, 4> values{};
217
218 TestResponse::Message response{};
219 response.repeated_field.SetDecoder(
220 [&values](TestResponse::StreamDecoder& decoder) {
221 return decoder.ReadRepeatedField(values);
222 });
223 ctx.response<test::GeneratedService::TestAnotherUnaryRpc>(0, response);
224
225 EXPECT_EQ(42, response.value);
226
227 EXPECT_EQ(3u, values.size());
228 EXPECT_EQ(7u, values[0]);
229 EXPECT_EQ(8u, values[1]);
230 EXPECT_EQ(9u, values[2]);
231 }
232
233 } // namespace
234 } // namespace pw::rpc
235