1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 package dev.pigweed.pw_transfer; 16 17 import static dev.pigweed.pw_transfer.TransferProgress.UNKNOWN_TRANSFER_SIZE; 18 import static java.lang.Math.max; 19 import static java.lang.Math.min; 20 21 import dev.pigweed.pw_log.Logger; 22 import dev.pigweed.pw_rpc.Status; 23 import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface; 24 import java.nio.ByteBuffer; 25 import java.util.ArrayList; 26 import java.util.List; 27 import java.util.function.BooleanSupplier; 28 import java.util.function.Consumer; 29 30 class ReadTransfer extends Transfer<byte[]> { 31 private static final Logger logger = Logger.forClass(ReadTransfer.class); 32 33 // The fractional position within a window at which a receive transfer should 34 // extend its window size to minimize the amount of time the transmitter 35 // spends blocked. 36 // 37 // For example, a divisor of 2 will extend the window when half of the 38 // requested data has been received, a divisor of three will extend at a third 39 // of the window, and so on. 40 private static final int EXTEND_WINDOW_DIVISOR = 2; 41 42 // To minimize copies, store the ByteBuffers directly from the chunk protos in a 43 // list. 44 private final List<ByteBuffer> dataChunks = new ArrayList<>(); 45 private int totalDataSize = 0; 46 47 private final TransferParameters parameters; 48 49 private long remainingTransferSize = UNKNOWN_TRANSFER_SIZE; 50 51 private int windowEndOffset = 0; 52 53 private int windowSize = 0; 54 private int windowSizeMultiplier = 1; 55 private TransmitPhase transmitPhase = TransmitPhase.SLOW_START; 56 57 private int lastReceivedOffset = 0; 58 59 // Slow start and congestion avoidance are analogues to the equally named phases 60 // in TCP congestion 61 // control. 62 private enum TransmitPhase { 63 SLOW_START, 64 CONGESTION_AVOIDANCE, 65 } 66 67 // The type of data transmission the transfer is requesting. 68 private enum TransmitAction { 69 // Immediate parameters sent at the start of a new transfer for legacy 70 // compatibility. 71 BEGIN, 72 73 // Initial parameters chunk following the opening handshake. 74 FIRST_PARAMETERS, 75 76 // Extend the current transmission window. 77 EXTEND, 78 79 // Rewind the transfer to a certain offset following data loss. 80 RETRANSMIT, 81 } 82 ReadTransfer(int resourceId, int sessionId, ProtocolVersion desiredProtocolVersion, TransferInterface transferManager, TransferTimeoutSettings timeoutSettings, TransferParameters transferParameters, Consumer<TransferProgress> progressCallback, BooleanSupplier shouldAbortCallback, int initialOffset)83 ReadTransfer(int resourceId, 84 int sessionId, 85 ProtocolVersion desiredProtocolVersion, 86 TransferInterface transferManager, 87 TransferTimeoutSettings timeoutSettings, 88 TransferParameters transferParameters, 89 Consumer<TransferProgress> progressCallback, 90 BooleanSupplier shouldAbortCallback, 91 int initialOffset) { 92 super(resourceId, 93 sessionId, 94 desiredProtocolVersion, 95 transferManager, 96 timeoutSettings, 97 progressCallback, 98 shouldAbortCallback, 99 initialOffset); 100 this.parameters = transferParameters; 101 this.windowEndOffset = parameters.maxChunkSizeBytes(); 102 this.windowSize = parameters.maxChunkSizeBytes(); 103 } 104 getParametersForTest()105 final TransferParameters getParametersForTest() { 106 return parameters; 107 } 108 109 @Override getWaitingForDataState()110 State getWaitingForDataState() { 111 return new ReceivingData(); 112 } 113 114 @Override prepareInitialChunk(VersionedChunk.Builder chunk)115 void prepareInitialChunk(VersionedChunk.Builder chunk) { 116 chunk.setInitialOffset(getOffset()); 117 setTransferParameters(chunk); 118 } 119 120 @Override getChunkForRetry()121 VersionedChunk getChunkForRetry() { 122 VersionedChunk chunk = getLastChunkSent(); 123 // If the last chunk sent was transfer parameters, send an updated RETRANSMIT 124 // chunk. 125 if (chunk.type() == Chunk.Type.PARAMETERS_CONTINUE 126 || chunk.type() == Chunk.Type.PARAMETERS_RETRANSMIT) { 127 return prepareTransferParameters(TransmitAction.RETRANSMIT); 128 } 129 return chunk; 130 } 131 132 private class ReceivingData extends ActiveState { 133 @Override handleDataChunk(VersionedChunk chunk)134 public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { 135 // Track the last seen offset so the DropRecovery state can detect retried 136 // packets. 137 lastReceivedOffset = chunk.offset(); 138 139 if (chunk.offset() != getOffset()) { 140 // If the chunk's data has already been received, don't go through a full 141 // recovery cycle to avoid shrinking the window size and potentially 142 // thrashing. The expected data may already be in-flight, so just allow 143 // the transmitter to keep going with a CONTINUE parameters chunk. 144 if (chunk.offset() + chunk.data().size() <= getOffset()) { 145 logger.atFine().log("%s received duplicate chunk with offset offset %d", 146 ReadTransfer.this, 147 chunk.offset()); 148 sendChunk(prepareTransferParameters(TransmitAction.EXTEND, false)); 149 } else { 150 logger.atFine().log("%s expected offset %d, received %d; resending transfer parameters", 151 ReadTransfer.this, 152 getOffset(), 153 chunk.offset()); 154 155 // For now, only in-order transfers are supported. If data is received out of 156 // order, discard this data and retransmit from the last received offset. 157 sendChunk(prepareTransferParameters(TransmitAction.RETRANSMIT)); 158 changeState(new DropRecovery()); 159 } 160 setNextChunkTimeout(); 161 return; 162 } 163 164 // Add the underlying array(s) to a list to avoid making copies of the data. 165 dataChunks.addAll(chunk.data().asReadOnlyByteBufferList()); 166 totalDataSize += chunk.data().size(); 167 168 setOffset(getOffset() + chunk.data().size()); 169 170 if (chunk.remainingBytes().isPresent()) { 171 if (chunk.remainingBytes().getAsLong() == 0) { 172 setStateTerminatingAndSendFinalChunk(Status.OK); 173 return; 174 } 175 176 remainingTransferSize = chunk.remainingBytes().getAsLong(); 177 } else if (remainingTransferSize != UNKNOWN_TRANSFER_SIZE) { 178 // If remaining size was not specified, update based on the most recent 179 // estimate, if any. 180 remainingTransferSize = max(remainingTransferSize - chunk.data().size(), 0); 181 } 182 183 if (remainingTransferSize == UNKNOWN_TRANSFER_SIZE || remainingTransferSize == 0) { 184 updateProgress(getOffset(), getOffset(), UNKNOWN_TRANSFER_SIZE); 185 } else { 186 updateProgress(getOffset(), getOffset(), getOffset() + remainingTransferSize); 187 } 188 189 int remainingWindowSize = windowEndOffset - getOffset(); 190 boolean extendWindow = remainingWindowSize <= windowSize / EXTEND_WINDOW_DIVISOR; 191 192 if (remainingWindowSize == 0) { 193 logger.atFinest().log( 194 "%s received all pending bytes; sending transfer parameters update", ReadTransfer.this); 195 sendChunk(prepareTransferParameters(TransmitAction.EXTEND)); 196 } else if (extendWindow) { 197 sendChunk(prepareTransferParameters(TransmitAction.EXTEND)); 198 } 199 setNextChunkTimeout(); 200 } 201 } 202 203 /** State for recovering from dropped packets. */ 204 private class DropRecovery extends ActiveState { 205 @Override handleDataChunk(VersionedChunk chunk)206 public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { 207 if (chunk.offset() == getOffset()) { 208 logger.atFine().log( 209 "%s received expected offset %d, resuming transfer", ReadTransfer.this, getOffset()); 210 changeState(new ReceivingData()).handleDataChunk(chunk); 211 return; 212 } 213 214 // To avoid a flood of identical parameters packets, only send one if a retry is 215 // detected. 216 if (chunk.offset() == lastReceivedOffset) { 217 logger.atFiner().log( 218 "%s received repeated offset %d: retry detected, resending transfer parameters", 219 ReadTransfer.this, 220 lastReceivedOffset); 221 sendChunk(prepareTransferParameters(TransmitAction.RETRANSMIT)); 222 } else { 223 lastReceivedOffset = chunk.offset(); 224 logger.atFiner().log("%s expecting offset %d, ignoring received offset %d", 225 ReadTransfer.this, 226 getOffset(), 227 chunk.offset()); 228 } 229 setNextChunkTimeout(); 230 } 231 } 232 233 @Override setFutureResult()234 void setFutureResult() { 235 updateProgress(totalDataSize, totalDataSize, totalDataSize); 236 237 ByteBuffer result = ByteBuffer.allocate(totalDataSize); 238 dataChunks.forEach(result::put); 239 set(result.array()); 240 } 241 prepareTransferParameters(TransmitAction action)242 private VersionedChunk prepareTransferParameters(TransmitAction action) { 243 return prepareTransferParameters(action, true); 244 } 245 prepareTransferParameters(TransmitAction action, boolean update)246 private VersionedChunk prepareTransferParameters(TransmitAction action, boolean update) { 247 Chunk.Type type; 248 249 switch (action) { 250 case BEGIN: 251 // Initial window is always one chunk. No special handling required. 252 type = Chunk.Type.START; 253 break; 254 255 case FIRST_PARAMETERS: 256 // Initial window is always one chunk. No special handling required. 257 type = Chunk.Type.PARAMETERS_RETRANSMIT; 258 break; 259 260 case EXTEND: 261 // Window was received successfully without packet loss and should grow. Double 262 // the window 263 // size during slow start, or increase it by a single chunk in congestion 264 // avoidance. 265 type = Chunk.Type.PARAMETERS_CONTINUE; 266 267 if (update) { 268 if (transmitPhase == TransmitPhase.SLOW_START) { 269 windowSizeMultiplier *= 2; 270 } else { 271 windowSizeMultiplier += 1; 272 } 273 274 // The window size can never exceed the user-specified maximum bytes. If it 275 // does, reduce 276 // the multiplier to the largest size that fits. 277 windowSizeMultiplier = min(windowSizeMultiplier, parameters.maxChunksInWindow()); 278 } 279 break; 280 281 case RETRANSMIT: 282 // A packet was lost: shrink the window size. Additionally, after the first 283 // packet loss, 284 // transition from the slow start to the congestion avoidance phase of the 285 // transfer. 286 type = Chunk.Type.PARAMETERS_RETRANSMIT; 287 if (update) { 288 windowSizeMultiplier = max(windowSizeMultiplier / 2, 1); 289 if (transmitPhase == TransmitPhase.SLOW_START) { 290 transmitPhase = TransmitPhase.CONGESTION_AVOIDANCE; 291 } 292 } 293 break; 294 295 default: 296 throw new AssertionError("Invalid transmit action"); 297 } 298 299 if (update) { 300 windowSize = windowSizeMultiplier * parameters.maxChunkSizeBytes(); 301 windowEndOffset = getOffset() + windowSize; 302 } 303 304 return setTransferParameters(newChunk(type)).build(); 305 } 306 setTransferParameters(VersionedChunk.Builder chunk)307 private VersionedChunk.Builder setTransferParameters(VersionedChunk.Builder chunk) { 308 chunk.setMaxChunkSizeBytes(parameters.maxChunkSizeBytes()) 309 .setOffset(getOffset()) 310 .setWindowEndOffset(windowEndOffset); 311 if (parameters.chunkDelayMicroseconds() > 0) { 312 chunk.setMinDelayMicroseconds(parameters.chunkDelayMicroseconds()); 313 } 314 return chunk; 315 } 316 } 317