1 // Copyright 2022 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 import com.google.common.collect.ImmutableList; 16 import com.google.protobuf.TextFormat; 17 import dev.pigweed.pw_hdlc.Decoder; 18 import dev.pigweed.pw_hdlc.Encoder; 19 import dev.pigweed.pw_hdlc.Frame; 20 import dev.pigweed.pw_log.Logger; 21 import dev.pigweed.pw_rpc.Channel; 22 import dev.pigweed.pw_rpc.ChannelOutputException; 23 import dev.pigweed.pw_rpc.Client; 24 import dev.pigweed.pw_rpc.Status; 25 import dev.pigweed.pw_transfer.ProtocolVersion; 26 import dev.pigweed.pw_transfer.TransferClient; 27 import dev.pigweed.pw_transfer.TransferError; 28 import dev.pigweed.pw_transfer.TransferService; 29 import dev.pigweed.pw_transfer.TransferTimeoutSettings; 30 import java.io.IOException; 31 import java.io.InputStream; 32 import java.io.OutputStream; 33 import java.net.Socket; 34 import java.net.SocketException; 35 import java.nio.ByteBuffer; 36 import java.nio.charset.StandardCharsets; 37 import java.nio.file.Files; 38 import java.nio.file.Path; 39 import java.nio.file.Paths; 40 import java.util.concurrent.ExecutionException; 41 import pw.transfer.ConfigProtos; 42 import pw.transfer.ConfigProtos.TransferAction; 43 44 public class JavaClient { 45 private static final String SERVICE = "pw.transfer.Transfer"; 46 private static final Logger logger = Logger.forClass(Client.class); 47 48 private static final int CHANNEL_ID = 1; 49 private static final long RPC_HDLC_ADDRESS = 'R'; 50 private static final String HOSTNAME = "localhost"; 51 52 // This is the maximum size of the socket send buffers. Ideally, this is set 53 // to the lowest allowed value to minimize buffering between the proxy and 54 // clients so rate limiting causes the client to block and wait for the 55 // integration test proxy to drain rather than allowing OS buffers to backlog 56 // large quantities of data. 57 // 58 // Note that the OS may chose to not strictly follow this requested buffer 59 // size. Still, setting this value to be as small as possible does reduce 60 // bufer sizes significantly enough to better reflect typical inter-device 61 // communication. 62 // 63 // For this to be effective, servers should also configure their sockets to a 64 // smaller receive buffer size. 65 private static final int MAX_SOCKET_SEND_BUFFER_SIZE = 1; 66 67 private final HdlcRpcChannelOutput channelOutput; 68 private final Client rpcClient; 69 private final HdlcParseThread parseThread; 70 JavaClient(OutputStream writer, InputStream reader)71 public JavaClient(OutputStream writer, InputStream reader) { 72 this.channelOutput = new HdlcRpcChannelOutput(writer, RPC_HDLC_ADDRESS); 73 74 this.rpcClient = Client.create(ImmutableList.of(new Channel(CHANNEL_ID, this.channelOutput)), 75 ImmutableList.of(TransferService.get())); 76 77 this.parseThread = new HdlcParseThread(reader, this.rpcClient); 78 } 79 startClient()80 void startClient() { 81 parseThread.start(); 82 } 83 getRpcClient()84 Client getRpcClient() { 85 return this.rpcClient; 86 } 87 88 private class HdlcRpcChannelOutput implements Channel.Output { 89 private final OutputStream writer; 90 private final long address; 91 HdlcRpcChannelOutput(OutputStream writer, long address)92 public HdlcRpcChannelOutput(OutputStream writer, long address) { 93 this.writer = writer; 94 this.address = address; 95 } 96 send(byte[] packet)97 public void send(byte[] packet) throws ChannelOutputException { 98 try { 99 Encoder.writeUiFrame(this.address, ByteBuffer.wrap(packet), this.writer); 100 } catch (IOException e) { 101 throw new ChannelOutputException("Failed to write HDLC UI Frame", e); 102 } 103 } 104 } 105 106 private class HdlcParseThread extends Thread { 107 private final InputStream reader; 108 private final RpcOnComplete frame_handler; 109 private final Decoder decoder; 110 111 private class RpcOnComplete implements Decoder.OnCompleteFrame { 112 private final Client rpc_client; 113 RpcOnComplete(Client rpc_client)114 public RpcOnComplete(Client rpc_client) { 115 this.rpc_client = rpc_client; 116 } 117 onCompleteFrame(Frame frame)118 public void onCompleteFrame(Frame frame) { 119 if (frame.getAddress() == RPC_HDLC_ADDRESS) { 120 this.rpc_client.processPacket(frame.getPayload()); 121 } 122 } 123 } 124 HdlcParseThread(InputStream reader, Client rpc_client)125 public HdlcParseThread(InputStream reader, Client rpc_client) { 126 this.reader = reader; 127 this.frame_handler = new RpcOnComplete(rpc_client); 128 this.decoder = new Decoder(this.frame_handler); 129 } 130 run()131 public void run() { 132 while (true) { 133 int val = 0; 134 try { 135 val = this.reader.read(); 136 } catch (IOException e) { 137 logger.atSevere().log("HDLC parse thread read failed"); 138 System.exit(1); 139 } 140 this.decoder.process((byte) val); 141 } 142 } 143 } 144 ParseConfigFrom(InputStream reader)145 public static ConfigProtos.ClientConfig ParseConfigFrom(InputStream reader) throws IOException { 146 byte[] buffer = new byte[reader.available()]; 147 reader.read(buffer); 148 ConfigProtos.ClientConfig.Builder config_builder = ConfigProtos.ClientConfig.newBuilder(); 149 TextFormat.merge(new String(buffer, StandardCharsets.UTF_8), config_builder); 150 if (config_builder.getChunkTimeoutMs() == 0) { 151 throw new AssertionError("chunk_timeout_ms may not be 0"); 152 } 153 if (config_builder.getInitialChunkTimeoutMs() == 0) { 154 throw new AssertionError("initial_chunk_timeout_ms may not be 0"); 155 } 156 if (config_builder.getMaxRetries() == 0) { 157 throw new AssertionError("max_retries may not be 0"); 158 } 159 if (config_builder.getMaxLifetimeRetries() == 0) { 160 throw new AssertionError("max_lifetime_retries may not be 0"); 161 } 162 return config_builder.build(); 163 } 164 ReadFromServer(int resourceId, Path fileName, TransferClient client, Status expected_status, int initial_offset)165 public static void ReadFromServer(int resourceId, 166 Path fileName, 167 TransferClient client, 168 Status expected_status, 169 int initial_offset) { 170 byte[] data; 171 try { 172 data = client.read(resourceId, initial_offset).get(); 173 } catch (ExecutionException e) { 174 if (((TransferError) e.getCause()).status() != expected_status) { 175 throw new AssertionError("Unexpected transfer read failure", e); 176 } 177 // Expected failure occurred, skip trying to write the data knowing that 178 // it is missing. 179 return; 180 } catch (InterruptedException e) { 181 throw new AssertionError("Read from server failed", e); 182 } 183 184 if (expected_status != Status.OK) { 185 throw new AssertionError("Transfer succeeded unexpectedly"); 186 } 187 188 try { 189 Files.write(fileName, data); 190 } catch (IOException e) { 191 logger.atSevere().log("Failed to write to output file `%s`", fileName); 192 throw new AssertionError("Failed to write output file from server", e); 193 } 194 } 195 WriteToServer(int resourceId, Path fileName, TransferClient client, Status expected_status, int initial_offset)196 public static void WriteToServer(int resourceId, 197 Path fileName, 198 TransferClient client, 199 Status expected_status, 200 int initial_offset) { 201 if (Files.notExists(fileName)) { 202 logger.atSevere().log("Input file `%s` does not exist", fileName); 203 } 204 205 byte[] data; 206 try { 207 data = Files.readAllBytes(fileName); 208 } catch (IOException e) { 209 logger.atSevere().log("Failed to read input file `%s`", fileName); 210 throw new AssertionError("Failed to read input file on write to server", e); 211 } 212 213 try { 214 client.write(resourceId, data, initial_offset).get(); 215 } catch (ExecutionException e) { 216 if (((TransferError) e.getCause()).status() != expected_status) { 217 throw new AssertionError("Unexpected transfer write failure", e); 218 } 219 return; 220 } catch (InterruptedException e) { 221 throw new AssertionError("Write to server failed", e); 222 } 223 224 if (expected_status != Status.OK) { 225 throw new AssertionError("Transfer succeeded unexpectedly"); 226 } 227 } 228 main(String[] args)229 public static void main(String[] args) { 230 if (args.length != 1) { 231 logger.atSevere().log("Usage: PORT"); 232 System.exit(1); 233 } 234 235 // The port is provided directly as a commandline argument. 236 int port = Integer.parseInt(args[0]); 237 238 ConfigProtos.ClientConfig config; 239 try { 240 config = ParseConfigFrom(System.in); 241 } catch (IOException e) { 242 throw new AssertionError("Failed to parse config file from stdin", e); 243 } 244 245 Socket socket; 246 try { 247 socket = new Socket(HOSTNAME, port); 248 } catch (IOException e) { 249 logger.atSevere().log("Failed to connect to %s:%d", HOSTNAME, port); 250 throw new AssertionError("Failed to connect to server/proxy port", e); 251 } 252 try { 253 socket.setSendBufferSize(MAX_SOCKET_SEND_BUFFER_SIZE); 254 } catch (SocketException e) { 255 logger.atSevere().log("Invalid socket buffer size %d", MAX_SOCKET_SEND_BUFFER_SIZE); 256 throw new AssertionError("Invalid socket buffer size", e); 257 } 258 InputStream reader; 259 OutputStream writer; 260 261 try { 262 writer = socket.getOutputStream(); 263 reader = socket.getInputStream(); 264 } catch (IOException e) { 265 throw new AssertionError("Failed to open socket streams", e); 266 } 267 268 JavaClient hdlc_rpc_client = new JavaClient(writer, reader); 269 270 hdlc_rpc_client.startClient(); 271 272 TransferClient client = new TransferClient( 273 hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Read"), 274 hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Write"), 275 TransferTimeoutSettings.builder() 276 .setTimeoutMillis(config.getChunkTimeoutMs()) 277 .setInitialTimeoutMillis(config.getInitialChunkTimeoutMs()) 278 .setMaxRetries(config.getMaxRetries()) 279 .setMaxLifetimeRetries(config.getMaxLifetimeRetries()) 280 .build()); 281 282 for (ConfigProtos.TransferAction action : config.getTransferActionsList()) { 283 int resourceId = action.getResourceId(); 284 Path fileName = Paths.get(action.getFilePath()); 285 286 if (action.getProtocolVersion() != TransferAction.ProtocolVersion.UNKNOWN_VERSION) { 287 client.setProtocolVersion(ProtocolVersion.values()[action.getProtocolVersionValue()]); 288 } else { 289 client.setProtocolVersion(ProtocolVersion.latest()); 290 } 291 try { 292 if (action.getTransferType() == ConfigProtos.TransferAction.TransferType.WRITE_TO_SERVER) { 293 WriteToServer(resourceId, 294 fileName, 295 client, 296 Status.fromCode(action.getExpectedStatus().getNumber()), 297 action.getInitialOffset()); 298 } else if (action.getTransferType() 299 == ConfigProtos.TransferAction.TransferType.READ_FROM_SERVER) { 300 ReadFromServer(resourceId, 301 fileName, 302 client, 303 Status.fromCode(action.getExpectedStatus().getNumber()), 304 action.getInitialOffset()); 305 } else { 306 throw new AssertionError("Unknown transfer action type"); 307 } 308 } catch (AssertionError e) { 309 System.exit(1); 310 } 311 } 312 313 logger.atInfo().log("Transfer completed successfully"); 314 315 System.exit(0); 316 } 317 } 318