xref: /aosp_15_r20/external/pigweed/pw_rpc_transport/rpc_integration_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_chrono/system_clock.h"
16 #include "pw_rpc/client_server.h"
17 #include "pw_rpc/synchronous_call.h"
18 #include "pw_rpc_transport/egress_ingress.h"
19 #include "pw_rpc_transport/internal/test.rpc.pwpb.h"
20 #include "pw_rpc_transport/local_rpc_egress.h"
21 #include "pw_rpc_transport/service_registry.h"
22 #include "pw_rpc_transport/socket_rpc_transport.h"
23 #include "pw_status/status.h"
24 #include "pw_string/string.h"
25 #include "pw_thread/thread.h"
26 #include "pw_thread/thread_core.h"
27 #include "pw_thread_stl/options.h"
28 #include "pw_unit_test/framework.h"
29 
30 namespace pw::rpc {
31 namespace {
32 
33 using namespace std::chrono_literals;
34 
35 constexpr size_t kMaxTestMessageSize = 1024;
36 constexpr uint32_t kTestChannelId = 1;
37 
38 class TestService final
39     : public pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Service<
40           TestService> {
41  public:
Echo(const pw_rpc_transport::testing::pwpb::EchoMessage::Message & request,pw_rpc_transport::testing::pwpb::EchoMessage::Message & response)42   Status Echo(
43       const pw_rpc_transport::testing::pwpb::EchoMessage::Message& request,
44       pw_rpc_transport::testing::pwpb::EchoMessage::Message& response) {
45     response.msg = request.msg;
46     return OkStatus();
47   }
48 };
49 
50 template <size_t kMaxPacketSize, size_t kLocalEgressQueueSize>
51 struct SocketRpcEndpoint {
SocketRpcEndpointpw::rpc::__anon734e405c0111::SocketRpcEndpoint52   explicit SocketRpcEndpoint(SocketRpcTransport<kMaxPacketSize>& rpc_transport)
53       : transport(rpc_transport),
54         rpc_egress("tx", transport),
55         tx_channels({rpc::Channel::Create<kTestChannelId>(&rpc_egress)}),
56         rx_channels({ChannelEgress{kTestChannelId, local_egress}}),
57         rpc_ingress(rx_channels),
58         service_registry(tx_channels) {
59     local_egress.set_packet_processor(service_registry);
60     transport.set_ingress(rpc_ingress);
61   }
62 
63   LocalRpcEgress<kLocalEgressQueueSize, kMaxPacketSize> local_egress;
64   SocketRpcTransport<kMaxPacketSize>& transport;
65   SimpleRpcEgress<kMaxPacketSize> rpc_egress;
66   std::array<rpc::Channel, 1> tx_channels;
67   std::array<ChannelEgress, 1> rx_channels;
68   SimpleRpcIngress<kMaxPacketSize> rpc_ingress;
69   ServiceRegistry service_registry;
70 };
71 
TEST(RpcIntegrationTest,SocketTransport)72 TEST(RpcIntegrationTest, SocketTransport) {
73   constexpr size_t kMaxPacketSize = 512;
74   constexpr size_t kLocalEgressQueueSize = 20;
75   constexpr size_t kMessageSize = 50;
76 
77   SocketRpcTransport<kMaxPacketSize> a_to_b_transport(
78       SocketRpcTransport<kMaxPacketSize>::kAsServer, /*port=*/0);
79   auto a = SocketRpcEndpoint<kMaxPacketSize, kLocalEgressQueueSize>(
80       a_to_b_transport);
81   auto a_local_egress_thread = Thread(thread::stl::Options(), a.local_egress);
82   auto a_transport_thread = Thread(thread::stl::Options(), a.transport);
83 
84   a_to_b_transport.WaitUntilReady();
85 
86   SocketRpcTransport<kMaxPacketSize> b_to_a_transport(
87       SocketRpcTransport<kMaxPacketSize>::kAsClient,
88       "localhost",
89       a_to_b_transport.port());
90 
91   auto b = SocketRpcEndpoint<kMaxPacketSize, kLocalEgressQueueSize>(
92       b_to_a_transport);
93   auto b_local_egress_thread = Thread(thread::stl::Options(), b.local_egress);
94   auto b_transport_thread = Thread(thread::stl::Options(), b.transport);
95 
96   TestService b_test_service;
97   b.service_registry.RegisterService(b_test_service);
98   a_to_b_transport.WaitUntilConnected();
99   b_to_a_transport.WaitUntilConnected();
100 
101   for (int i = 0; i < 10; ++i) {
102     InlineString<kMaxTestMessageSize> test_message;
103     test_message.append(kMessageSize, '*');
104     auto echo_request = pw_rpc_transport::testing::pwpb::EchoMessage::Message{
105         .msg = test_message};
106     const auto echo_response = rpc::SynchronousCall<
107         pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Echo>(
108         a.service_registry.client_server().client(),
109         kTestChannelId,
110         echo_request);
111     EXPECT_EQ(echo_response.status(), OkStatus());
112     EXPECT_EQ(echo_response.response().msg, test_message);
113   }
114 
115   // Shut everything down.
116   a.local_egress.Stop();
117   b.local_egress.Stop();
118   a.transport.Stop();
119   b.transport.Stop();
120 
121   a_local_egress_thread.join();
122   b_local_egress_thread.join();
123   a_transport_thread.join();
124   b_transport_thread.join();
125 }
126 
127 }  // namespace
128 }  // namespace pw::rpc
129