xref: /aosp_15_r20/external/pigweed/pw_transfer/integration_test/JavaClient.java (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 import com.google.common.collect.ImmutableList;
16 import com.google.protobuf.TextFormat;
17 import dev.pigweed.pw_hdlc.Decoder;
18 import dev.pigweed.pw_hdlc.Encoder;
19 import dev.pigweed.pw_hdlc.Frame;
20 import dev.pigweed.pw_log.Logger;
21 import dev.pigweed.pw_rpc.Channel;
22 import dev.pigweed.pw_rpc.ChannelOutputException;
23 import dev.pigweed.pw_rpc.Client;
24 import dev.pigweed.pw_rpc.Status;
25 import dev.pigweed.pw_transfer.ProtocolVersion;
26 import dev.pigweed.pw_transfer.TransferClient;
27 import dev.pigweed.pw_transfer.TransferError;
28 import dev.pigweed.pw_transfer.TransferService;
29 import dev.pigweed.pw_transfer.TransferTimeoutSettings;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.OutputStream;
33 import java.net.Socket;
34 import java.net.SocketException;
35 import java.nio.ByteBuffer;
36 import java.nio.charset.StandardCharsets;
37 import java.nio.file.Files;
38 import java.nio.file.Path;
39 import java.nio.file.Paths;
40 import java.util.concurrent.ExecutionException;
41 import pw.transfer.ConfigProtos;
42 import pw.transfer.ConfigProtos.TransferAction;
43 
44 public class JavaClient {
45   private static final String SERVICE = "pw.transfer.Transfer";
46   private static final Logger logger = Logger.forClass(Client.class);
47 
48   private static final int CHANNEL_ID = 1;
49   private static final long RPC_HDLC_ADDRESS = 'R';
50   private static final String HOSTNAME = "localhost";
51 
52   // This is the maximum size of the socket send buffers. Ideally, this is set
53   // to the lowest allowed value to minimize buffering between the proxy and
54   // clients so rate limiting causes the client to block and wait for the
55   // integration test proxy to drain rather than allowing OS buffers to backlog
56   // large quantities of data.
57   //
58   // Note that the OS may chose to not strictly follow this requested buffer
59   // size. Still, setting this value to be as small as possible does reduce
60   // bufer sizes significantly enough to better reflect typical inter-device
61   // communication.
62   //
63   // For this to be effective, servers should also configure their sockets to a
64   // smaller receive buffer size.
65   private static final int MAX_SOCKET_SEND_BUFFER_SIZE = 1;
66 
67   private final HdlcRpcChannelOutput channelOutput;
68   private final Client rpcClient;
69   private final HdlcParseThread parseThread;
70 
JavaClient(OutputStream writer, InputStream reader)71   public JavaClient(OutputStream writer, InputStream reader) {
72     this.channelOutput = new HdlcRpcChannelOutput(writer, RPC_HDLC_ADDRESS);
73 
74     this.rpcClient = Client.create(ImmutableList.of(new Channel(CHANNEL_ID, this.channelOutput)),
75         ImmutableList.of(TransferService.get()));
76 
77     this.parseThread = new HdlcParseThread(reader, this.rpcClient);
78   }
79 
startClient()80   void startClient() {
81     parseThread.start();
82   }
83 
getRpcClient()84   Client getRpcClient() {
85     return this.rpcClient;
86   }
87 
88   private class HdlcRpcChannelOutput implements Channel.Output {
89     private final OutputStream writer;
90     private final long address;
91 
HdlcRpcChannelOutput(OutputStream writer, long address)92     public HdlcRpcChannelOutput(OutputStream writer, long address) {
93       this.writer = writer;
94       this.address = address;
95     }
96 
send(byte[] packet)97     public void send(byte[] packet) throws ChannelOutputException {
98       try {
99         Encoder.writeUiFrame(this.address, ByteBuffer.wrap(packet), this.writer);
100       } catch (IOException e) {
101         throw new ChannelOutputException("Failed to write HDLC UI Frame", e);
102       }
103     }
104   }
105 
106   private class HdlcParseThread extends Thread {
107     private final InputStream reader;
108     private final RpcOnComplete frame_handler;
109     private final Decoder decoder;
110 
111     private class RpcOnComplete implements Decoder.OnCompleteFrame {
112       private final Client rpc_client;
113 
RpcOnComplete(Client rpc_client)114       public RpcOnComplete(Client rpc_client) {
115         this.rpc_client = rpc_client;
116       }
117 
onCompleteFrame(Frame frame)118       public void onCompleteFrame(Frame frame) {
119         if (frame.getAddress() == RPC_HDLC_ADDRESS) {
120           this.rpc_client.processPacket(frame.getPayload());
121         }
122       }
123     }
124 
HdlcParseThread(InputStream reader, Client rpc_client)125     public HdlcParseThread(InputStream reader, Client rpc_client) {
126       this.reader = reader;
127       this.frame_handler = new RpcOnComplete(rpc_client);
128       this.decoder = new Decoder(this.frame_handler);
129     }
130 
run()131     public void run() {
132       while (true) {
133         int val = 0;
134         try {
135           val = this.reader.read();
136         } catch (IOException e) {
137           logger.atSevere().log("HDLC parse thread read failed");
138           System.exit(1);
139         }
140         this.decoder.process((byte) val);
141       }
142     }
143   }
144 
ParseConfigFrom(InputStream reader)145   public static ConfigProtos.ClientConfig ParseConfigFrom(InputStream reader) throws IOException {
146     byte[] buffer = new byte[reader.available()];
147     reader.read(buffer);
148     ConfigProtos.ClientConfig.Builder config_builder = ConfigProtos.ClientConfig.newBuilder();
149     TextFormat.merge(new String(buffer, StandardCharsets.UTF_8), config_builder);
150     if (config_builder.getChunkTimeoutMs() == 0) {
151       throw new AssertionError("chunk_timeout_ms may not be 0");
152     }
153     if (config_builder.getInitialChunkTimeoutMs() == 0) {
154       throw new AssertionError("initial_chunk_timeout_ms may not be 0");
155     }
156     if (config_builder.getMaxRetries() == 0) {
157       throw new AssertionError("max_retries may not be 0");
158     }
159     if (config_builder.getMaxLifetimeRetries() == 0) {
160       throw new AssertionError("max_lifetime_retries may not be 0");
161     }
162     return config_builder.build();
163   }
164 
ReadFromServer(int resourceId, Path fileName, TransferClient client, Status expected_status, int initial_offset)165   public static void ReadFromServer(int resourceId,
166       Path fileName,
167       TransferClient client,
168       Status expected_status,
169       int initial_offset) {
170     byte[] data;
171     try {
172       data = client.read(resourceId, initial_offset).get();
173     } catch (ExecutionException e) {
174       if (((TransferError) e.getCause()).status() != expected_status) {
175         throw new AssertionError("Unexpected transfer read failure", e);
176       }
177       // Expected failure occurred, skip trying to write the data knowing that
178       // it is missing.
179       return;
180     } catch (InterruptedException e) {
181       throw new AssertionError("Read from server failed", e);
182     }
183 
184     if (expected_status != Status.OK) {
185       throw new AssertionError("Transfer succeeded unexpectedly");
186     }
187 
188     try {
189       Files.write(fileName, data);
190     } catch (IOException e) {
191       logger.atSevere().log("Failed to write to output file `%s`", fileName);
192       throw new AssertionError("Failed to write output file from server", e);
193     }
194   }
195 
WriteToServer(int resourceId, Path fileName, TransferClient client, Status expected_status, int initial_offset)196   public static void WriteToServer(int resourceId,
197       Path fileName,
198       TransferClient client,
199       Status expected_status,
200       int initial_offset) {
201     if (Files.notExists(fileName)) {
202       logger.atSevere().log("Input file `%s` does not exist", fileName);
203     }
204 
205     byte[] data;
206     try {
207       data = Files.readAllBytes(fileName);
208     } catch (IOException e) {
209       logger.atSevere().log("Failed to read input file `%s`", fileName);
210       throw new AssertionError("Failed to read input file on write to server", e);
211     }
212 
213     try {
214       client.write(resourceId, data, initial_offset).get();
215     } catch (ExecutionException e) {
216       if (((TransferError) e.getCause()).status() != expected_status) {
217         throw new AssertionError("Unexpected transfer write failure", e);
218       }
219       return;
220     } catch (InterruptedException e) {
221       throw new AssertionError("Write to server failed", e);
222     }
223 
224     if (expected_status != Status.OK) {
225       throw new AssertionError("Transfer succeeded unexpectedly");
226     }
227   }
228 
main(String[] args)229   public static void main(String[] args) {
230     if (args.length != 1) {
231       logger.atSevere().log("Usage: PORT");
232       System.exit(1);
233     }
234 
235     // The port is provided directly as a commandline argument.
236     int port = Integer.parseInt(args[0]);
237 
238     ConfigProtos.ClientConfig config;
239     try {
240       config = ParseConfigFrom(System.in);
241     } catch (IOException e) {
242       throw new AssertionError("Failed to parse config file from stdin", e);
243     }
244 
245     Socket socket;
246     try {
247       socket = new Socket(HOSTNAME, port);
248     } catch (IOException e) {
249       logger.atSevere().log("Failed to connect to %s:%d", HOSTNAME, port);
250       throw new AssertionError("Failed to connect to server/proxy port", e);
251     }
252     try {
253       socket.setSendBufferSize(MAX_SOCKET_SEND_BUFFER_SIZE);
254     } catch (SocketException e) {
255       logger.atSevere().log("Invalid socket buffer size %d", MAX_SOCKET_SEND_BUFFER_SIZE);
256       throw new AssertionError("Invalid socket buffer size", e);
257     }
258     InputStream reader;
259     OutputStream writer;
260 
261     try {
262       writer = socket.getOutputStream();
263       reader = socket.getInputStream();
264     } catch (IOException e) {
265       throw new AssertionError("Failed to open socket streams", e);
266     }
267 
268     JavaClient hdlc_rpc_client = new JavaClient(writer, reader);
269 
270     hdlc_rpc_client.startClient();
271 
272     TransferClient client = new TransferClient(
273         hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Read"),
274         hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Write"),
275         TransferTimeoutSettings.builder()
276             .setTimeoutMillis(config.getChunkTimeoutMs())
277             .setInitialTimeoutMillis(config.getInitialChunkTimeoutMs())
278             .setMaxRetries(config.getMaxRetries())
279             .setMaxLifetimeRetries(config.getMaxLifetimeRetries())
280             .build());
281 
282     for (ConfigProtos.TransferAction action : config.getTransferActionsList()) {
283       int resourceId = action.getResourceId();
284       Path fileName = Paths.get(action.getFilePath());
285 
286       if (action.getProtocolVersion() != TransferAction.ProtocolVersion.UNKNOWN_VERSION) {
287         client.setProtocolVersion(ProtocolVersion.values()[action.getProtocolVersionValue()]);
288       } else {
289         client.setProtocolVersion(ProtocolVersion.latest());
290       }
291       try {
292         if (action.getTransferType() == ConfigProtos.TransferAction.TransferType.WRITE_TO_SERVER) {
293           WriteToServer(resourceId,
294               fileName,
295               client,
296               Status.fromCode(action.getExpectedStatus().getNumber()),
297               action.getInitialOffset());
298         } else if (action.getTransferType()
299             == ConfigProtos.TransferAction.TransferType.READ_FROM_SERVER) {
300           ReadFromServer(resourceId,
301               fileName,
302               client,
303               Status.fromCode(action.getExpectedStatus().getNumber()),
304               action.getInitialOffset());
305         } else {
306           throw new AssertionError("Unknown transfer action type");
307         }
308       } catch (AssertionError e) {
309         System.exit(1);
310       }
311     }
312 
313     logger.atInfo().log("Transfer completed successfully");
314 
315     System.exit(0);
316   }
317 }
318