// Copyright 2022 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. import com.google.common.collect.ImmutableList; import com.google.protobuf.TextFormat; import dev.pigweed.pw_hdlc.Decoder; import dev.pigweed.pw_hdlc.Encoder; import dev.pigweed.pw_hdlc.Frame; import dev.pigweed.pw_log.Logger; import dev.pigweed.pw_rpc.Channel; import dev.pigweed.pw_rpc.ChannelOutputException; import dev.pigweed.pw_rpc.Client; import dev.pigweed.pw_rpc.Status; import dev.pigweed.pw_transfer.ProtocolVersion; import dev.pigweed.pw_transfer.TransferClient; import dev.pigweed.pw_transfer.TransferError; import dev.pigweed.pw_transfer.TransferService; import dev.pigweed.pw_transfer.TransferTimeoutSettings; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.concurrent.ExecutionException; import pw.transfer.ConfigProtos; import pw.transfer.ConfigProtos.TransferAction; public class JavaClient { private static final String SERVICE = "pw.transfer.Transfer"; private static final Logger logger = Logger.forClass(Client.class); private static final int CHANNEL_ID = 1; private static final long RPC_HDLC_ADDRESS = 'R'; private static final String HOSTNAME = "localhost"; // This is the maximum size of the socket send buffers. Ideally, this is set // to the lowest allowed value to minimize buffering between the proxy and // clients so rate limiting causes the client to block and wait for the // integration test proxy to drain rather than allowing OS buffers to backlog // large quantities of data. // // Note that the OS may chose to not strictly follow this requested buffer // size. Still, setting this value to be as small as possible does reduce // bufer sizes significantly enough to better reflect typical inter-device // communication. // // For this to be effective, servers should also configure their sockets to a // smaller receive buffer size. private static final int MAX_SOCKET_SEND_BUFFER_SIZE = 1; private final HdlcRpcChannelOutput channelOutput; private final Client rpcClient; private final HdlcParseThread parseThread; public JavaClient(OutputStream writer, InputStream reader) { this.channelOutput = new HdlcRpcChannelOutput(writer, RPC_HDLC_ADDRESS); this.rpcClient = Client.create(ImmutableList.of(new Channel(CHANNEL_ID, this.channelOutput)), ImmutableList.of(TransferService.get())); this.parseThread = new HdlcParseThread(reader, this.rpcClient); } void startClient() { parseThread.start(); } Client getRpcClient() { return this.rpcClient; } private class HdlcRpcChannelOutput implements Channel.Output { private final OutputStream writer; private final long address; public HdlcRpcChannelOutput(OutputStream writer, long address) { this.writer = writer; this.address = address; } public void send(byte[] packet) throws ChannelOutputException { try { Encoder.writeUiFrame(this.address, ByteBuffer.wrap(packet), this.writer); } catch (IOException e) { throw new ChannelOutputException("Failed to write HDLC UI Frame", e); } } } private class HdlcParseThread extends Thread { private final InputStream reader; private final RpcOnComplete frame_handler; private final Decoder decoder; private class RpcOnComplete implements Decoder.OnCompleteFrame { private final Client rpc_client; public RpcOnComplete(Client rpc_client) { this.rpc_client = rpc_client; } public void onCompleteFrame(Frame frame) { if (frame.getAddress() == RPC_HDLC_ADDRESS) { this.rpc_client.processPacket(frame.getPayload()); } } } public HdlcParseThread(InputStream reader, Client rpc_client) { this.reader = reader; this.frame_handler = new RpcOnComplete(rpc_client); this.decoder = new Decoder(this.frame_handler); } public void run() { while (true) { int val = 0; try { val = this.reader.read(); } catch (IOException e) { logger.atSevere().log("HDLC parse thread read failed"); System.exit(1); } this.decoder.process((byte) val); } } } public static ConfigProtos.ClientConfig ParseConfigFrom(InputStream reader) throws IOException { byte[] buffer = new byte[reader.available()]; reader.read(buffer); ConfigProtos.ClientConfig.Builder config_builder = ConfigProtos.ClientConfig.newBuilder(); TextFormat.merge(new String(buffer, StandardCharsets.UTF_8), config_builder); if (config_builder.getChunkTimeoutMs() == 0) { throw new AssertionError("chunk_timeout_ms may not be 0"); } if (config_builder.getInitialChunkTimeoutMs() == 0) { throw new AssertionError("initial_chunk_timeout_ms may not be 0"); } if (config_builder.getMaxRetries() == 0) { throw new AssertionError("max_retries may not be 0"); } if (config_builder.getMaxLifetimeRetries() == 0) { throw new AssertionError("max_lifetime_retries may not be 0"); } return config_builder.build(); } public static void ReadFromServer(int resourceId, Path fileName, TransferClient client, Status expected_status, int initial_offset) { byte[] data; try { data = client.read(resourceId, initial_offset).get(); } catch (ExecutionException e) { if (((TransferError) e.getCause()).status() != expected_status) { throw new AssertionError("Unexpected transfer read failure", e); } // Expected failure occurred, skip trying to write the data knowing that // it is missing. return; } catch (InterruptedException e) { throw new AssertionError("Read from server failed", e); } if (expected_status != Status.OK) { throw new AssertionError("Transfer succeeded unexpectedly"); } try { Files.write(fileName, data); } catch (IOException e) { logger.atSevere().log("Failed to write to output file `%s`", fileName); throw new AssertionError("Failed to write output file from server", e); } } public static void WriteToServer(int resourceId, Path fileName, TransferClient client, Status expected_status, int initial_offset) { if (Files.notExists(fileName)) { logger.atSevere().log("Input file `%s` does not exist", fileName); } byte[] data; try { data = Files.readAllBytes(fileName); } catch (IOException e) { logger.atSevere().log("Failed to read input file `%s`", fileName); throw new AssertionError("Failed to read input file on write to server", e); } try { client.write(resourceId, data, initial_offset).get(); } catch (ExecutionException e) { if (((TransferError) e.getCause()).status() != expected_status) { throw new AssertionError("Unexpected transfer write failure", e); } return; } catch (InterruptedException e) { throw new AssertionError("Write to server failed", e); } if (expected_status != Status.OK) { throw new AssertionError("Transfer succeeded unexpectedly"); } } public static void main(String[] args) { if (args.length != 1) { logger.atSevere().log("Usage: PORT"); System.exit(1); } // The port is provided directly as a commandline argument. int port = Integer.parseInt(args[0]); ConfigProtos.ClientConfig config; try { config = ParseConfigFrom(System.in); } catch (IOException e) { throw new AssertionError("Failed to parse config file from stdin", e); } Socket socket; try { socket = new Socket(HOSTNAME, port); } catch (IOException e) { logger.atSevere().log("Failed to connect to %s:%d", HOSTNAME, port); throw new AssertionError("Failed to connect to server/proxy port", e); } try { socket.setSendBufferSize(MAX_SOCKET_SEND_BUFFER_SIZE); } catch (SocketException e) { logger.atSevere().log("Invalid socket buffer size %d", MAX_SOCKET_SEND_BUFFER_SIZE); throw new AssertionError("Invalid socket buffer size", e); } InputStream reader; OutputStream writer; try { writer = socket.getOutputStream(); reader = socket.getInputStream(); } catch (IOException e) { throw new AssertionError("Failed to open socket streams", e); } JavaClient hdlc_rpc_client = new JavaClient(writer, reader); hdlc_rpc_client.startClient(); TransferClient client = new TransferClient( hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Read"), hdlc_rpc_client.getRpcClient().method(CHANNEL_ID, TransferService.get().name() + "/Write"), TransferTimeoutSettings.builder() .setTimeoutMillis(config.getChunkTimeoutMs()) .setInitialTimeoutMillis(config.getInitialChunkTimeoutMs()) .setMaxRetries(config.getMaxRetries()) .setMaxLifetimeRetries(config.getMaxLifetimeRetries()) .build()); for (ConfigProtos.TransferAction action : config.getTransferActionsList()) { int resourceId = action.getResourceId(); Path fileName = Paths.get(action.getFilePath()); if (action.getProtocolVersion() != TransferAction.ProtocolVersion.UNKNOWN_VERSION) { client.setProtocolVersion(ProtocolVersion.values()[action.getProtocolVersionValue()]); } else { client.setProtocolVersion(ProtocolVersion.latest()); } try { if (action.getTransferType() == ConfigProtos.TransferAction.TransferType.WRITE_TO_SERVER) { WriteToServer(resourceId, fileName, client, Status.fromCode(action.getExpectedStatus().getNumber()), action.getInitialOffset()); } else if (action.getTransferType() == ConfigProtos.TransferAction.TransferType.READ_FROM_SERVER) { ReadFromServer(resourceId, fileName, client, Status.fromCode(action.getExpectedStatus().getNumber()), action.getInitialOffset()); } else { throw new AssertionError("Unknown transfer action type"); } } catch (AssertionError e) { System.exit(1); } } logger.atInfo().log("Transfer completed successfully"); System.exit(0); } }