xref: /aosp_15_r20/external/pigweed/pw_transfer/integration_test/server.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker // Copyright 2022 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker 
15*61c4878aSAndroid Build Coastguard Worker // Simple RPC server with the transfer service registered. Reads HDLC frames
16*61c4878aSAndroid Build Coastguard Worker // with RPC packets through a socket. This server has a single resource ID that
17*61c4878aSAndroid Build Coastguard Worker // is available, and data must be written to the server before data can be read
18*61c4878aSAndroid Build Coastguard Worker // from the resource ID.
19*61c4878aSAndroid Build Coastguard Worker //
20*61c4878aSAndroid Build Coastguard Worker // Usage:
21*61c4878aSAndroid Build Coastguard Worker //
22*61c4878aSAndroid Build Coastguard Worker //   integration_test_server 3300 <<< "resource_id: 12 file: '/tmp/gotbytes'"
23*61c4878aSAndroid Build Coastguard Worker 
24*61c4878aSAndroid Build Coastguard Worker #include <sys/socket.h>
25*61c4878aSAndroid Build Coastguard Worker 
26*61c4878aSAndroid Build Coastguard Worker #include <chrono>
27*61c4878aSAndroid Build Coastguard Worker #include <cstddef>
28*61c4878aSAndroid Build Coastguard Worker #include <cstdlib>
29*61c4878aSAndroid Build Coastguard Worker #include <deque>
30*61c4878aSAndroid Build Coastguard Worker #include <map>
31*61c4878aSAndroid Build Coastguard Worker #include <memory>
32*61c4878aSAndroid Build Coastguard Worker #include <string>
33*61c4878aSAndroid Build Coastguard Worker #include <thread>
34*61c4878aSAndroid Build Coastguard Worker #include <utility>
35*61c4878aSAndroid Build Coastguard Worker #include <variant>
36*61c4878aSAndroid Build Coastguard Worker #include <vector>
37*61c4878aSAndroid Build Coastguard Worker 
38*61c4878aSAndroid Build Coastguard Worker #include "google/protobuf/text_format.h"
39*61c4878aSAndroid Build Coastguard Worker #include "pw_assert/check.h"
40*61c4878aSAndroid Build Coastguard Worker #include "pw_chrono/system_clock.h"
41*61c4878aSAndroid Build Coastguard Worker #include "pw_log/log.h"
42*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc_system_server/rpc_server.h"
43*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc_system_server/socket.h"
44*61c4878aSAndroid Build Coastguard Worker #include "pw_stream/std_file_stream.h"
45*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/thread.h"
46*61c4878aSAndroid Build Coastguard Worker #include "pw_thread_stl/options.h"
47*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/integration_test/config.pb.h"
48*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/transfer.h"
49*61c4878aSAndroid Build Coastguard Worker 
50*61c4878aSAndroid Build Coastguard Worker namespace pw::transfer {
51*61c4878aSAndroid Build Coastguard Worker namespace {
52*61c4878aSAndroid Build Coastguard Worker 
53*61c4878aSAndroid Build Coastguard Worker using stream::MemoryReader;
54*61c4878aSAndroid Build Coastguard Worker using stream::MemoryWriter;
55*61c4878aSAndroid Build Coastguard Worker 
56*61c4878aSAndroid Build Coastguard Worker // This is the maximum size of the socket send buffers. Ideally, this is set
57*61c4878aSAndroid Build Coastguard Worker // to the lowest allowed value to minimize buffering between the proxy and
58*61c4878aSAndroid Build Coastguard Worker // clients so rate limiting causes the client to block and wait for the
59*61c4878aSAndroid Build Coastguard Worker // integration test proxy to drain rather than allowing OS buffers to backlog
60*61c4878aSAndroid Build Coastguard Worker // large quantities of data.
61*61c4878aSAndroid Build Coastguard Worker //
62*61c4878aSAndroid Build Coastguard Worker // Note that the OS may chose to not strictly follow this requested buffer size.
63*61c4878aSAndroid Build Coastguard Worker // Still, setting this value to be as small as possible does reduce bufer sizes
64*61c4878aSAndroid Build Coastguard Worker // significantly enough to better reflect typical inter-device communication.
65*61c4878aSAndroid Build Coastguard Worker //
66*61c4878aSAndroid Build Coastguard Worker // For this to be effective, servers should also configure their sockets to a
67*61c4878aSAndroid Build Coastguard Worker // smaller receive buffer size.
68*61c4878aSAndroid Build Coastguard Worker constexpr int kMaxSocketSendBufferSize = 1;
69*61c4878aSAndroid Build Coastguard Worker 
70*61c4878aSAndroid Build Coastguard Worker class FileTransferHandler final : public ReadWriteHandler {
71*61c4878aSAndroid Build Coastguard Worker  public:
FileTransferHandler(uint32_t resource_id,std::deque<std::string> && sources,std::deque<std::string> && destinations,std::string default_source_path,std::string default_destination_path,bool offsettable)72*61c4878aSAndroid Build Coastguard Worker   FileTransferHandler(uint32_t resource_id,
73*61c4878aSAndroid Build Coastguard Worker                       std::deque<std::string>&& sources,
74*61c4878aSAndroid Build Coastguard Worker                       std::deque<std::string>&& destinations,
75*61c4878aSAndroid Build Coastguard Worker                       std::string default_source_path,
76*61c4878aSAndroid Build Coastguard Worker                       std::string default_destination_path,
77*61c4878aSAndroid Build Coastguard Worker                       bool offsettable)
78*61c4878aSAndroid Build Coastguard Worker       : ReadWriteHandler(resource_id),
79*61c4878aSAndroid Build Coastguard Worker         sources_(sources),
80*61c4878aSAndroid Build Coastguard Worker         destinations_(destinations),
81*61c4878aSAndroid Build Coastguard Worker         default_source_path_(default_source_path),
82*61c4878aSAndroid Build Coastguard Worker         default_destination_path_(default_destination_path),
83*61c4878aSAndroid Build Coastguard Worker         offsettable(offsettable) {}
84*61c4878aSAndroid Build Coastguard Worker 
85*61c4878aSAndroid Build Coastguard Worker   ~FileTransferHandler() = default;
86*61c4878aSAndroid Build Coastguard Worker 
PrepareRead()87*61c4878aSAndroid Build Coastguard Worker   Status PrepareRead() final {
88*61c4878aSAndroid Build Coastguard Worker     if (sources_.empty() && default_source_path_.length() == 0) {
89*61c4878aSAndroid Build Coastguard Worker       PW_LOG_ERROR("Source paths exhausted");
90*61c4878aSAndroid Build Coastguard Worker       return Status::ResourceExhausted();
91*61c4878aSAndroid Build Coastguard Worker     }
92*61c4878aSAndroid Build Coastguard Worker 
93*61c4878aSAndroid Build Coastguard Worker     std::string path;
94*61c4878aSAndroid Build Coastguard Worker     if (!sources_.empty()) {
95*61c4878aSAndroid Build Coastguard Worker       path = sources_.front();
96*61c4878aSAndroid Build Coastguard Worker       sources_.pop_front();
97*61c4878aSAndroid Build Coastguard Worker     } else {
98*61c4878aSAndroid Build Coastguard Worker       path = default_source_path_;
99*61c4878aSAndroid Build Coastguard Worker     }
100*61c4878aSAndroid Build Coastguard Worker 
101*61c4878aSAndroid Build Coastguard Worker     PW_LOG_DEBUG("Preparing read for file %s", path.c_str());
102*61c4878aSAndroid Build Coastguard Worker     set_reader(stream_.emplace<stream::StdFileReader>(path.c_str()));
103*61c4878aSAndroid Build Coastguard Worker     return OkStatus();
104*61c4878aSAndroid Build Coastguard Worker   }
105*61c4878aSAndroid Build Coastguard Worker 
PrepareRead(uint32_t offset)106*61c4878aSAndroid Build Coastguard Worker   Status PrepareRead(uint32_t offset) final {
107*61c4878aSAndroid Build Coastguard Worker     if (!offsettable) {
108*61c4878aSAndroid Build Coastguard Worker       return Status::Unimplemented();
109*61c4878aSAndroid Build Coastguard Worker     }
110*61c4878aSAndroid Build Coastguard Worker 
111*61c4878aSAndroid Build Coastguard Worker     if (Status status = PrepareRead(); !status.ok()) {
112*61c4878aSAndroid Build Coastguard Worker       return status;
113*61c4878aSAndroid Build Coastguard Worker     }
114*61c4878aSAndroid Build Coastguard Worker 
115*61c4878aSAndroid Build Coastguard Worker     if (offset >
116*61c4878aSAndroid Build Coastguard Worker         std::get<stream::StdFileReader>(stream_).ConservativeReadLimit()) {
117*61c4878aSAndroid Build Coastguard Worker       return Status::ResourceExhausted();
118*61c4878aSAndroid Build Coastguard Worker     }
119*61c4878aSAndroid Build Coastguard Worker 
120*61c4878aSAndroid Build Coastguard Worker     return std::get<stream::StdFileReader>(stream_).Seek(offset);
121*61c4878aSAndroid Build Coastguard Worker   }
122*61c4878aSAndroid Build Coastguard Worker 
FinalizeRead(Status)123*61c4878aSAndroid Build Coastguard Worker   void FinalizeRead(Status) final {
124*61c4878aSAndroid Build Coastguard Worker     std::get<stream::StdFileReader>(stream_).Close();
125*61c4878aSAndroid Build Coastguard Worker   }
126*61c4878aSAndroid Build Coastguard Worker 
PrepareWrite()127*61c4878aSAndroid Build Coastguard Worker   Status PrepareWrite() final {
128*61c4878aSAndroid Build Coastguard Worker     if (destinations_.empty() && default_destination_path_.length() == 0) {
129*61c4878aSAndroid Build Coastguard Worker       PW_LOG_ERROR("Destination paths exhausted");
130*61c4878aSAndroid Build Coastguard Worker       return Status::ResourceExhausted();
131*61c4878aSAndroid Build Coastguard Worker     }
132*61c4878aSAndroid Build Coastguard Worker 
133*61c4878aSAndroid Build Coastguard Worker     std::string path;
134*61c4878aSAndroid Build Coastguard Worker     if (!destinations_.empty()) {
135*61c4878aSAndroid Build Coastguard Worker       path = destinations_.front();
136*61c4878aSAndroid Build Coastguard Worker       destinations_.pop_front();
137*61c4878aSAndroid Build Coastguard Worker     } else {
138*61c4878aSAndroid Build Coastguard Worker       path = default_destination_path_;
139*61c4878aSAndroid Build Coastguard Worker     }
140*61c4878aSAndroid Build Coastguard Worker 
141*61c4878aSAndroid Build Coastguard Worker     PW_LOG_DEBUG("Preparing write for file %s", path.c_str());
142*61c4878aSAndroid Build Coastguard Worker     set_writer(stream_.emplace<stream::StdFileWriter>(path.c_str()));
143*61c4878aSAndroid Build Coastguard Worker     return OkStatus();
144*61c4878aSAndroid Build Coastguard Worker   }
145*61c4878aSAndroid Build Coastguard Worker 
PrepareWrite(uint32_t offset)146*61c4878aSAndroid Build Coastguard Worker   Status PrepareWrite(uint32_t offset) final {
147*61c4878aSAndroid Build Coastguard Worker     if (!offsettable) {
148*61c4878aSAndroid Build Coastguard Worker       return Status::Unimplemented();
149*61c4878aSAndroid Build Coastguard Worker     }
150*61c4878aSAndroid Build Coastguard Worker 
151*61c4878aSAndroid Build Coastguard Worker     if (Status status = PrepareWrite(); !status.ok()) {
152*61c4878aSAndroid Build Coastguard Worker       return status;
153*61c4878aSAndroid Build Coastguard Worker     }
154*61c4878aSAndroid Build Coastguard Worker 
155*61c4878aSAndroid Build Coastguard Worker     // It does not appear possible to hit this limit
156*61c4878aSAndroid Build Coastguard Worker     if (offset >
157*61c4878aSAndroid Build Coastguard Worker         std::get<stream::StdFileWriter>(stream_).ConservativeWriteLimit()) {
158*61c4878aSAndroid Build Coastguard Worker       return Status::ResourceExhausted();
159*61c4878aSAndroid Build Coastguard Worker     }
160*61c4878aSAndroid Build Coastguard Worker 
161*61c4878aSAndroid Build Coastguard Worker     return std::get<stream::StdFileWriter>(stream_).Seek(offset);
162*61c4878aSAndroid Build Coastguard Worker   }
163*61c4878aSAndroid Build Coastguard Worker 
FinalizeWrite(Status)164*61c4878aSAndroid Build Coastguard Worker   Status FinalizeWrite(Status) final {
165*61c4878aSAndroid Build Coastguard Worker     std::get<stream::StdFileWriter>(stream_).Close();
166*61c4878aSAndroid Build Coastguard Worker     return OkStatus();
167*61c4878aSAndroid Build Coastguard Worker   }
168*61c4878aSAndroid Build Coastguard Worker 
169*61c4878aSAndroid Build Coastguard Worker  private:
170*61c4878aSAndroid Build Coastguard Worker   std::deque<std::string> sources_;
171*61c4878aSAndroid Build Coastguard Worker   std::deque<std::string> destinations_;
172*61c4878aSAndroid Build Coastguard Worker   std::string default_source_path_;
173*61c4878aSAndroid Build Coastguard Worker   std::string default_destination_path_;
174*61c4878aSAndroid Build Coastguard Worker   std::variant<std::monostate, stream::StdFileReader, stream::StdFileWriter>
175*61c4878aSAndroid Build Coastguard Worker       stream_;
176*61c4878aSAndroid Build Coastguard Worker   bool offsettable;
177*61c4878aSAndroid Build Coastguard Worker };
178*61c4878aSAndroid Build Coastguard Worker 
RunServer(int socket_port,ServerConfig config)179*61c4878aSAndroid Build Coastguard Worker void RunServer(int socket_port, ServerConfig config) {
180*61c4878aSAndroid Build Coastguard Worker   std::vector<std::byte> chunk_buffer(config.chunk_size_bytes());
181*61c4878aSAndroid Build Coastguard Worker   std::vector<std::byte> encode_buffer(config.chunk_size_bytes());
182*61c4878aSAndroid Build Coastguard Worker   transfer::Thread<4, 4> transfer_thread(chunk_buffer, encode_buffer);
183*61c4878aSAndroid Build Coastguard Worker   TransferService transfer_service(
184*61c4878aSAndroid Build Coastguard Worker       transfer_thread,
185*61c4878aSAndroid Build Coastguard Worker       config.pending_bytes(),
186*61c4878aSAndroid Build Coastguard Worker       std::chrono::seconds(config.chunk_timeout_seconds()),
187*61c4878aSAndroid Build Coastguard Worker       config.transfer_service_retries(),
188*61c4878aSAndroid Build Coastguard Worker       config.extend_window_divisor());
189*61c4878aSAndroid Build Coastguard Worker 
190*61c4878aSAndroid Build Coastguard Worker   rpc::system_server::set_socket_port(socket_port);
191*61c4878aSAndroid Build Coastguard Worker 
192*61c4878aSAndroid Build Coastguard Worker   rpc::system_server::Init();
193*61c4878aSAndroid Build Coastguard Worker   rpc::system_server::Server().RegisterService(transfer_service);
194*61c4878aSAndroid Build Coastguard Worker 
195*61c4878aSAndroid Build Coastguard Worker   // Start transfer thread.
196*61c4878aSAndroid Build Coastguard Worker   pw::Thread transfer_thread_handle(thread::stl::Options(), transfer_thread);
197*61c4878aSAndroid Build Coastguard Worker 
198*61c4878aSAndroid Build Coastguard Worker   int retval =
199*61c4878aSAndroid Build Coastguard Worker       rpc::system_server::SetServerSockOpt(SOL_SOCKET,
200*61c4878aSAndroid Build Coastguard Worker                                            SO_SNDBUF,
201*61c4878aSAndroid Build Coastguard Worker                                            &kMaxSocketSendBufferSize,
202*61c4878aSAndroid Build Coastguard Worker                                            sizeof(kMaxSocketSendBufferSize));
203*61c4878aSAndroid Build Coastguard Worker   PW_CHECK_INT_EQ(retval,
204*61c4878aSAndroid Build Coastguard Worker                   0,
205*61c4878aSAndroid Build Coastguard Worker                   "Failed to configure socket send buffer size with errno=%d",
206*61c4878aSAndroid Build Coastguard Worker                   errno);
207*61c4878aSAndroid Build Coastguard Worker 
208*61c4878aSAndroid Build Coastguard Worker   std::vector<std::unique_ptr<FileTransferHandler>> handlers;
209*61c4878aSAndroid Build Coastguard Worker   for (const auto& resource : config.resources()) {
210*61c4878aSAndroid Build Coastguard Worker     uint32_t id = resource.first;
211*61c4878aSAndroid Build Coastguard Worker 
212*61c4878aSAndroid Build Coastguard Worker     std::deque<std::string> source_paths(resource.second.source_paths().begin(),
213*61c4878aSAndroid Build Coastguard Worker                                          resource.second.source_paths().end());
214*61c4878aSAndroid Build Coastguard Worker     std::deque<std::string> destination_paths(
215*61c4878aSAndroid Build Coastguard Worker         resource.second.destination_paths().begin(),
216*61c4878aSAndroid Build Coastguard Worker         resource.second.destination_paths().end());
217*61c4878aSAndroid Build Coastguard Worker 
218*61c4878aSAndroid Build Coastguard Worker     auto handler = std::make_unique<FileTransferHandler>(
219*61c4878aSAndroid Build Coastguard Worker         id,
220*61c4878aSAndroid Build Coastguard Worker         std::move(source_paths),
221*61c4878aSAndroid Build Coastguard Worker         std::move(destination_paths),
222*61c4878aSAndroid Build Coastguard Worker         resource.second.default_source_path(),
223*61c4878aSAndroid Build Coastguard Worker         resource.second.default_destination_path(),
224*61c4878aSAndroid Build Coastguard Worker         resource.second.offsettable());
225*61c4878aSAndroid Build Coastguard Worker 
226*61c4878aSAndroid Build Coastguard Worker     transfer_service.RegisterHandler(*handler);
227*61c4878aSAndroid Build Coastguard Worker     handlers.push_back(std::move(handler));
228*61c4878aSAndroid Build Coastguard Worker   }
229*61c4878aSAndroid Build Coastguard Worker 
230*61c4878aSAndroid Build Coastguard Worker   PW_LOG_INFO("Starting pw_rpc server");
231*61c4878aSAndroid Build Coastguard Worker   PW_CHECK_OK(rpc::system_server::Start());
232*61c4878aSAndroid Build Coastguard Worker 
233*61c4878aSAndroid Build Coastguard Worker   // Unregister transfer handler before cleaning up the thread since doing so
234*61c4878aSAndroid Build Coastguard Worker   // requires the transfer thread to be running.
235*61c4878aSAndroid Build Coastguard Worker   for (auto& handler : handlers) {
236*61c4878aSAndroid Build Coastguard Worker     transfer_service.UnregisterHandler(*handler);
237*61c4878aSAndroid Build Coastguard Worker   }
238*61c4878aSAndroid Build Coastguard Worker 
239*61c4878aSAndroid Build Coastguard Worker   // End transfer thread.
240*61c4878aSAndroid Build Coastguard Worker   transfer_thread.Terminate();
241*61c4878aSAndroid Build Coastguard Worker   transfer_thread_handle.join();
242*61c4878aSAndroid Build Coastguard Worker }
243*61c4878aSAndroid Build Coastguard Worker 
244*61c4878aSAndroid Build Coastguard Worker }  // namespace
245*61c4878aSAndroid Build Coastguard Worker }  // namespace pw::transfer
246*61c4878aSAndroid Build Coastguard Worker 
main(int argc,char * argv[])247*61c4878aSAndroid Build Coastguard Worker int main(int argc, char* argv[]) {
248*61c4878aSAndroid Build Coastguard Worker   if (argc != 2) {
249*61c4878aSAndroid Build Coastguard Worker     PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
250*61c4878aSAndroid Build Coastguard Worker     return 1;
251*61c4878aSAndroid Build Coastguard Worker   }
252*61c4878aSAndroid Build Coastguard Worker 
253*61c4878aSAndroid Build Coastguard Worker   int port = std::atoi(argv[1]);
254*61c4878aSAndroid Build Coastguard Worker   PW_CHECK_UINT_GT(port, 0, "Invalid port!");
255*61c4878aSAndroid Build Coastguard Worker 
256*61c4878aSAndroid Build Coastguard Worker   std::string config_string;
257*61c4878aSAndroid Build Coastguard Worker   std::string line;
258*61c4878aSAndroid Build Coastguard Worker   while (std::getline(std::cin, line)) {
259*61c4878aSAndroid Build Coastguard Worker     config_string = config_string + line + '\n';
260*61c4878aSAndroid Build Coastguard Worker   }
261*61c4878aSAndroid Build Coastguard Worker   pw::transfer::ServerConfig config;
262*61c4878aSAndroid Build Coastguard Worker 
263*61c4878aSAndroid Build Coastguard Worker   bool ok =
264*61c4878aSAndroid Build Coastguard Worker       google::protobuf::TextFormat::ParseFromString(config_string, &config);
265*61c4878aSAndroid Build Coastguard Worker   if (!ok) {
266*61c4878aSAndroid Build Coastguard Worker     PW_LOG_INFO("Failed to parse config: %s", config_string.c_str());
267*61c4878aSAndroid Build Coastguard Worker     PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
268*61c4878aSAndroid Build Coastguard Worker     return 1;
269*61c4878aSAndroid Build Coastguard Worker   } else {
270*61c4878aSAndroid Build Coastguard Worker     PW_LOG_INFO("Server loaded config:\n%s", config.DebugString().c_str());
271*61c4878aSAndroid Build Coastguard Worker   }
272*61c4878aSAndroid Build Coastguard Worker 
273*61c4878aSAndroid Build Coastguard Worker   pw::transfer::RunServer(port, config);
274*61c4878aSAndroid Build Coastguard Worker   return 0;
275*61c4878aSAndroid Build Coastguard Worker }
276