xref: /aosp_15_r20/external/pigweed/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 package dev.pigweed.pw_transfer;
16 
17 import static dev.pigweed.pw_transfer.TransferProgress.UNKNOWN_TRANSFER_SIZE;
18 import static java.lang.Math.max;
19 import static java.lang.Math.min;
20 
21 import dev.pigweed.pw_log.Logger;
22 import dev.pigweed.pw_rpc.Status;
23 import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.function.BooleanSupplier;
28 import java.util.function.Consumer;
29 
30 class ReadTransfer extends Transfer<byte[]> {
31   private static final Logger logger = Logger.forClass(ReadTransfer.class);
32 
33   // The fractional position within a window at which a receive transfer should
34   // extend its window size to minimize the amount of time the transmitter
35   // spends blocked.
36   //
37   // For example, a divisor of 2 will extend the window when half of the
38   // requested data has been received, a divisor of three will extend at a third
39   // of the window, and so on.
40   private static final int EXTEND_WINDOW_DIVISOR = 2;
41 
42   // To minimize copies, store the ByteBuffers directly from the chunk protos in a
43   // list.
44   private final List<ByteBuffer> dataChunks = new ArrayList<>();
45   private int totalDataSize = 0;
46 
47   private final TransferParameters parameters;
48 
49   private long remainingTransferSize = UNKNOWN_TRANSFER_SIZE;
50 
51   private int windowEndOffset = 0;
52 
53   private int windowSize = 0;
54   private int windowSizeMultiplier = 1;
55   private TransmitPhase transmitPhase = TransmitPhase.SLOW_START;
56 
57   private int lastReceivedOffset = 0;
58 
59   // Slow start and congestion avoidance are analogues to the equally named phases
60   // in TCP congestion
61   // control.
62   private enum TransmitPhase {
63     SLOW_START,
64     CONGESTION_AVOIDANCE,
65   }
66 
67   // The type of data transmission the transfer is requesting.
68   private enum TransmitAction {
69     // Immediate parameters sent at the start of a new transfer for legacy
70     // compatibility.
71     BEGIN,
72 
73     // Initial parameters chunk following the opening handshake.
74     FIRST_PARAMETERS,
75 
76     // Extend the current transmission window.
77     EXTEND,
78 
79     // Rewind the transfer to a certain offset following data loss.
80     RETRANSMIT,
81   }
82 
ReadTransfer(int resourceId, int sessionId, ProtocolVersion desiredProtocolVersion, TransferInterface transferManager, TransferTimeoutSettings timeoutSettings, TransferParameters transferParameters, Consumer<TransferProgress> progressCallback, BooleanSupplier shouldAbortCallback, int initialOffset)83   ReadTransfer(int resourceId,
84       int sessionId,
85       ProtocolVersion desiredProtocolVersion,
86       TransferInterface transferManager,
87       TransferTimeoutSettings timeoutSettings,
88       TransferParameters transferParameters,
89       Consumer<TransferProgress> progressCallback,
90       BooleanSupplier shouldAbortCallback,
91       int initialOffset) {
92     super(resourceId,
93         sessionId,
94         desiredProtocolVersion,
95         transferManager,
96         timeoutSettings,
97         progressCallback,
98         shouldAbortCallback,
99         initialOffset);
100     this.parameters = transferParameters;
101     this.windowEndOffset = parameters.maxChunkSizeBytes();
102     this.windowSize = parameters.maxChunkSizeBytes();
103   }
104 
getParametersForTest()105   final TransferParameters getParametersForTest() {
106     return parameters;
107   }
108 
109   @Override
getWaitingForDataState()110   State getWaitingForDataState() {
111     return new ReceivingData();
112   }
113 
114   @Override
prepareInitialChunk(VersionedChunk.Builder chunk)115   void prepareInitialChunk(VersionedChunk.Builder chunk) {
116     chunk.setInitialOffset(getOffset());
117     setTransferParameters(chunk);
118   }
119 
120   @Override
getChunkForRetry()121   VersionedChunk getChunkForRetry() {
122     VersionedChunk chunk = getLastChunkSent();
123     // If the last chunk sent was transfer parameters, send an updated RETRANSMIT
124     // chunk.
125     if (chunk.type() == Chunk.Type.PARAMETERS_CONTINUE
126         || chunk.type() == Chunk.Type.PARAMETERS_RETRANSMIT) {
127       return prepareTransferParameters(TransmitAction.RETRANSMIT);
128     }
129     return chunk;
130   }
131 
132   private class ReceivingData extends ActiveState {
133     @Override
handleDataChunk(VersionedChunk chunk)134     public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
135       // Track the last seen offset so the DropRecovery state can detect retried
136       // packets.
137       lastReceivedOffset = chunk.offset();
138 
139       if (chunk.offset() != getOffset()) {
140         // If the chunk's data has already been received, don't go through a full
141         // recovery cycle to avoid shrinking the window size and potentially
142         // thrashing. The expected data may already be in-flight, so just allow
143         // the transmitter to keep going with a CONTINUE parameters chunk.
144         if (chunk.offset() + chunk.data().size() <= getOffset()) {
145           logger.atFine().log("%s received duplicate chunk with offset offset %d",
146               ReadTransfer.this,
147               chunk.offset());
148           sendChunk(prepareTransferParameters(TransmitAction.EXTEND, false));
149         } else {
150           logger.atFine().log("%s expected offset %d, received %d; resending transfer parameters",
151               ReadTransfer.this,
152               getOffset(),
153               chunk.offset());
154 
155           // For now, only in-order transfers are supported. If data is received out of
156           // order, discard this data and retransmit from the last received offset.
157           sendChunk(prepareTransferParameters(TransmitAction.RETRANSMIT));
158           changeState(new DropRecovery());
159         }
160         setNextChunkTimeout();
161         return;
162       }
163 
164       // Add the underlying array(s) to a list to avoid making copies of the data.
165       dataChunks.addAll(chunk.data().asReadOnlyByteBufferList());
166       totalDataSize += chunk.data().size();
167 
168       setOffset(getOffset() + chunk.data().size());
169 
170       if (chunk.remainingBytes().isPresent()) {
171         if (chunk.remainingBytes().getAsLong() == 0) {
172           setStateTerminatingAndSendFinalChunk(Status.OK);
173           return;
174         }
175 
176         remainingTransferSize = chunk.remainingBytes().getAsLong();
177       } else if (remainingTransferSize != UNKNOWN_TRANSFER_SIZE) {
178         // If remaining size was not specified, update based on the most recent
179         // estimate, if any.
180         remainingTransferSize = max(remainingTransferSize - chunk.data().size(), 0);
181       }
182 
183       if (remainingTransferSize == UNKNOWN_TRANSFER_SIZE || remainingTransferSize == 0) {
184         updateProgress(getOffset(), getOffset(), UNKNOWN_TRANSFER_SIZE);
185       } else {
186         updateProgress(getOffset(), getOffset(), getOffset() + remainingTransferSize);
187       }
188 
189       int remainingWindowSize = windowEndOffset - getOffset();
190       boolean extendWindow = remainingWindowSize <= windowSize / EXTEND_WINDOW_DIVISOR;
191 
192       if (remainingWindowSize == 0) {
193         logger.atFinest().log(
194             "%s received all pending bytes; sending transfer parameters update", ReadTransfer.this);
195         sendChunk(prepareTransferParameters(TransmitAction.EXTEND));
196       } else if (extendWindow) {
197         sendChunk(prepareTransferParameters(TransmitAction.EXTEND));
198       }
199       setNextChunkTimeout();
200     }
201   }
202 
203   /** State for recovering from dropped packets. */
204   private class DropRecovery extends ActiveState {
205     @Override
handleDataChunk(VersionedChunk chunk)206     public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
207       if (chunk.offset() == getOffset()) {
208         logger.atFine().log(
209             "%s received expected offset %d, resuming transfer", ReadTransfer.this, getOffset());
210         changeState(new ReceivingData()).handleDataChunk(chunk);
211         return;
212       }
213 
214       // To avoid a flood of identical parameters packets, only send one if a retry is
215       // detected.
216       if (chunk.offset() == lastReceivedOffset) {
217         logger.atFiner().log(
218             "%s received repeated offset %d: retry detected, resending transfer parameters",
219             ReadTransfer.this,
220             lastReceivedOffset);
221         sendChunk(prepareTransferParameters(TransmitAction.RETRANSMIT));
222       } else {
223         lastReceivedOffset = chunk.offset();
224         logger.atFiner().log("%s expecting offset %d, ignoring received offset %d",
225             ReadTransfer.this,
226             getOffset(),
227             chunk.offset());
228       }
229       setNextChunkTimeout();
230     }
231   }
232 
233   @Override
setFutureResult()234   void setFutureResult() {
235     updateProgress(totalDataSize, totalDataSize, totalDataSize);
236 
237     ByteBuffer result = ByteBuffer.allocate(totalDataSize);
238     dataChunks.forEach(result::put);
239     set(result.array());
240   }
241 
prepareTransferParameters(TransmitAction action)242   private VersionedChunk prepareTransferParameters(TransmitAction action) {
243     return prepareTransferParameters(action, true);
244   }
245 
prepareTransferParameters(TransmitAction action, boolean update)246   private VersionedChunk prepareTransferParameters(TransmitAction action, boolean update) {
247     Chunk.Type type;
248 
249     switch (action) {
250       case BEGIN:
251         // Initial window is always one chunk. No special handling required.
252         type = Chunk.Type.START;
253         break;
254 
255       case FIRST_PARAMETERS:
256         // Initial window is always one chunk. No special handling required.
257         type = Chunk.Type.PARAMETERS_RETRANSMIT;
258         break;
259 
260       case EXTEND:
261         // Window was received successfully without packet loss and should grow. Double
262         // the window
263         // size during slow start, or increase it by a single chunk in congestion
264         // avoidance.
265         type = Chunk.Type.PARAMETERS_CONTINUE;
266 
267         if (update) {
268           if (transmitPhase == TransmitPhase.SLOW_START) {
269             windowSizeMultiplier *= 2;
270           } else {
271             windowSizeMultiplier += 1;
272           }
273 
274           // The window size can never exceed the user-specified maximum bytes. If it
275           // does, reduce
276           // the multiplier to the largest size that fits.
277           windowSizeMultiplier = min(windowSizeMultiplier, parameters.maxChunksInWindow());
278         }
279         break;
280 
281       case RETRANSMIT:
282         // A packet was lost: shrink the window size. Additionally, after the first
283         // packet loss,
284         // transition from the slow start to the congestion avoidance phase of the
285         // transfer.
286         type = Chunk.Type.PARAMETERS_RETRANSMIT;
287         if (update) {
288           windowSizeMultiplier = max(windowSizeMultiplier / 2, 1);
289           if (transmitPhase == TransmitPhase.SLOW_START) {
290             transmitPhase = TransmitPhase.CONGESTION_AVOIDANCE;
291           }
292         }
293         break;
294 
295       default:
296         throw new AssertionError("Invalid transmit action");
297     }
298 
299     if (update) {
300       windowSize = windowSizeMultiplier * parameters.maxChunkSizeBytes();
301       windowEndOffset = getOffset() + windowSize;
302     }
303 
304     return setTransferParameters(newChunk(type)).build();
305   }
306 
setTransferParameters(VersionedChunk.Builder chunk)307   private VersionedChunk.Builder setTransferParameters(VersionedChunk.Builder chunk) {
308     chunk.setMaxChunkSizeBytes(parameters.maxChunkSizeBytes())
309         .setOffset(getOffset())
310         .setWindowEndOffset(windowEndOffset);
311     if (parameters.chunkDelayMicroseconds() > 0) {
312       chunk.setMinDelayMicroseconds(parameters.chunkDelayMicroseconds());
313     }
314     return chunk;
315   }
316 }
317