xref: /aosp_15_r20/external/pigweed/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 package dev.pigweed.pw_transfer;
16 
17 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
18 import static dev.pigweed.pw_transfer.TransferProgress.UNKNOWN_TRANSFER_SIZE;
19 
20 import com.google.common.util.concurrent.AbstractFuture;
21 import dev.pigweed.pw_log.Logger;
22 import dev.pigweed.pw_rpc.Status;
23 import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface;
24 import java.time.Duration;
25 import java.time.Instant;
26 import java.util.Locale;
27 import java.util.function.BooleanSupplier;
28 import java.util.function.Consumer;
29 
30 /** Base class for tracking the state of a read or write transfer. */
31 abstract class Transfer<T> extends AbstractFuture<T> {
32   private static final Logger logger = Logger.forClass(Transfer.class);
33 
34   // Largest nanosecond instant. Used to block indefinitely when no transfers are
35   // pending.
36   static final Instant NO_TIMEOUT = Instant.ofEpochSecond(0, Long.MAX_VALUE);
37 
38   // Whether to output some particularly noisy logs.
39   static final boolean VERBOSE_LOGGING = false;
40 
41   private final int resourceId;
42   private final int sessionId;
43   private int offset;
44   private final ProtocolVersion desiredProtocolVersion;
45   private final TransferEventHandler.TransferInterface eventHandler;
46   private final TransferTimeoutSettings timeoutSettings;
47   private final Consumer<TransferProgress> progressCallback;
48   private final BooleanSupplier shouldAbortCallback;
49   private final Instant startTime;
50 
51   private ProtocolVersion configuredProtocolVersion = ProtocolVersion.UNKNOWN;
52   private Instant deadline = NO_TIMEOUT;
53   private State state;
54   private VersionedChunk lastChunkSent;
55 
56   private int lifetimeRetries = 0;
57 
58   /**
59    * Creates a new read or write transfer.
60    *
61    * @param resourceId             The resource ID of the transfer
62    * @param desiredProtocolVersion protocol version to request
63    * @param eventHandler           Interface to use to send a chunk.
64    * @param timeoutSettings        Timeout and retry settings for this transfer.
65    * @param progressCallback       Called each time a packet is sent.
66    * @param shouldAbortCallback    BooleanSupplier that returns true if a transfer
67    *                               should be aborted.
68    */
Transfer(int resourceId, int sessionId, ProtocolVersion desiredProtocolVersion, TransferInterface eventHandler, TransferTimeoutSettings timeoutSettings, Consumer<TransferProgress> progressCallback, BooleanSupplier shouldAbortCallback, int initialOffset)69   Transfer(int resourceId,
70       int sessionId,
71       ProtocolVersion desiredProtocolVersion,
72       TransferInterface eventHandler,
73       TransferTimeoutSettings timeoutSettings,
74       Consumer<TransferProgress> progressCallback,
75       BooleanSupplier shouldAbortCallback,
76       int initialOffset) {
77     this.resourceId = resourceId;
78     this.sessionId = sessionId;
79     this.offset = initialOffset;
80     this.desiredProtocolVersion = desiredProtocolVersion;
81     this.eventHandler = eventHandler;
82 
83     this.timeoutSettings = timeoutSettings;
84     this.progressCallback = progressCallback;
85     this.shouldAbortCallback = shouldAbortCallback;
86 
87     // If the future is cancelled, tell the TransferEventHandler to cancel the
88     // transfer.
89     addListener(() -> {
90       if (isCancelled()) {
91         eventHandler.cancelTransfer(this);
92       }
93     }, directExecutor());
94 
95     if (desiredProtocolVersion == ProtocolVersion.LEGACY) {
96       // Legacy transfers skip protocol negotiation stage and use the resource ID as
97       // the session ID.
98       configuredProtocolVersion = ProtocolVersion.LEGACY;
99       state = getWaitingForDataState();
100     } else {
101       state = new Initiating();
102     }
103 
104     startTime = Instant.now();
105   }
106 
107   @Override
toString()108   public String toString() {
109     return String.format(Locale.ENGLISH,
110         "%s(%d:%d)[%s]",
111         this.getClass().getSimpleName(),
112         resourceId,
113         sessionId,
114         state.getClass().getSimpleName());
115   }
116 
getResourceId()117   public final int getResourceId() {
118     return resourceId;
119   }
120 
getSessionId()121   public final int getSessionId() {
122     return sessionId;
123   }
124 
getOffset()125   public final int getOffset() {
126     return offset;
127   }
128 
getDesiredProtocolVersion()129   final ProtocolVersion getDesiredProtocolVersion() {
130     return desiredProtocolVersion;
131   }
132 
133   /** Terminates the transfer without sending any packets. */
terminate(TransferError error)134   public final void terminate(TransferError error) {
135     changeState(new Completed(error));
136   }
137 
getDeadline()138   final Instant getDeadline() {
139     return deadline;
140   }
141 
setOffset(int offset)142   final void setOffset(int offset) {
143     this.offset = offset;
144   }
145 
setNextChunkTimeout()146   final void setNextChunkTimeout() {
147     deadline = Instant.now().plusMillis(timeoutSettings.timeoutMillis());
148   }
149 
setInitialTimeout()150   private void setInitialTimeout() {
151     deadline = Instant.now().plusMillis(timeoutSettings.initialTimeoutMillis());
152   }
153 
setTimeoutMicros(int timeoutMicros)154   final void setTimeoutMicros(int timeoutMicros) {
155     deadline = Instant.now().plusNanos((long) timeoutMicros * 1000);
156   }
157 
start()158   final void start() {
159     logger.atInfo().log(
160         "%s starting with parameters: default timeout %d ms, initial timeout %d ms, %d max retires",
161         this,
162         timeoutSettings.timeoutMillis(),
163         timeoutSettings.initialTimeoutMillis(),
164         timeoutSettings.maxRetries());
165     VersionedChunk.Builder chunk =
166         VersionedChunk.createInitialChunk(desiredProtocolVersion, resourceId, sessionId);
167     prepareInitialChunk(chunk);
168     try {
169       sendChunk(chunk.build());
170     } catch (TransferAbortedException e) {
171       return; // Sending failed, transfer is cancelled
172     }
173     setInitialTimeout();
174   }
175 
176   /** Processes an incoming chunk from the server. */
handleChunk(VersionedChunk chunk)177   final void handleChunk(VersionedChunk chunk) {
178     try {
179       if (chunk.type() == Chunk.Type.COMPLETION) {
180         state.handleFinalChunk(chunk.status().orElseGet(() -> {
181           logger.atWarning().log("Received terminating chunk with no status set; using INTERNAL");
182           return Status.INTERNAL.code();
183         }));
184       } else {
185         state.handleDataChunk(chunk);
186       }
187     } catch (TransferAbortedException e) {
188       // Transfer was aborted; nothing else to do.
189     }
190   }
191 
handleTimeoutIfDeadlineExceeded()192   final void handleTimeoutIfDeadlineExceeded() {
193     if (Instant.now().isAfter(deadline)) {
194       try {
195         state.handleTimeout();
196       } catch (TransferAbortedException e) {
197         // Transfer was aborted; nothing else to do.
198       }
199     }
200   }
201 
handleTermination()202   final void handleTermination() {
203     state.handleTermination();
204   }
205 
handleCancellation()206   final void handleCancellation() {
207     state.handleCancellation();
208   }
209 
210   /** Returns the State to enter immediately after sending the first packet. */
getWaitingForDataState()211   abstract State getWaitingForDataState();
212 
prepareInitialChunk(VersionedChunk.Builder chunk)213   abstract void prepareInitialChunk(VersionedChunk.Builder chunk);
214 
215   /**
216    * Returns the chunk to send for a retry. Returns the initial chunk if no chunks
217    * have been sent.
218    */
getChunkForRetry()219   abstract VersionedChunk getChunkForRetry();
220 
221   /** Sets the result for the future after a successful transfer. */
setFutureResult()222   abstract void setFutureResult();
223 
newChunk(Chunk.Type type)224   final VersionedChunk.Builder newChunk(Chunk.Type type) {
225     return VersionedChunk.builder()
226         .setVersion(configuredProtocolVersion != ProtocolVersion.UNKNOWN ? configuredProtocolVersion
227                                                                          : desiredProtocolVersion)
228         .setType(type)
229         .setSessionId(sessionId)
230         .setResourceId(resourceId);
231   }
232 
getLastChunkSent()233   final VersionedChunk getLastChunkSent() {
234     return lastChunkSent;
235   }
236 
changeState(State newState)237   final State changeState(State newState) {
238     if (newState != state) {
239       logger.atFinest().log("%s state %s -> %s",
240           this,
241           state.getClass().getSimpleName(),
242           newState.getClass().getSimpleName());
243     }
244     state = newState;
245     return state;
246   }
247 
248   /** Exception thrown when the transfer is aborted. */
249   static class TransferAbortedException extends Exception {}
250 
251   /**
252    * Sends a chunk.
253    *
254    * If sending fails, the transfer cannot proceed. sendChunk() sets the state to
255    * completed and
256    * throws a TransferAbortedException.
257    */
sendChunk(VersionedChunk chunk)258   final void sendChunk(VersionedChunk chunk) throws TransferAbortedException {
259     lastChunkSent = chunk;
260     if (shouldAbortCallback.getAsBoolean()) {
261       logger.atWarning().log("Abort signal received.");
262       changeState(new Completed(new TransferError(this, Status.ABORTED)));
263       throw new TransferAbortedException();
264     }
265 
266     try {
267       if (VERBOSE_LOGGING) {
268         logger.atFinest().log("%s sending %s", this, chunk);
269       }
270       eventHandler.sendChunk(chunk.toMessage());
271     } catch (TransferError transferError) {
272       changeState(new Completed(transferError));
273       throw new TransferAbortedException();
274     }
275   }
276 
277   /** Sends a status chunk to the server and finishes the transfer. */
setStateTerminatingAndSendFinalChunk(Status status)278   final void setStateTerminatingAndSendFinalChunk(Status status) throws TransferAbortedException {
279     logger.atFine().log("%s sending final chunk with status %s", this, status);
280     sendChunk(newChunk(Chunk.Type.COMPLETION).setStatus(status).build());
281     if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) {
282       changeState(new Terminating(status));
283     } else {
284       changeState(new Completed(status));
285     }
286   }
287 
288   /** Invokes the transfer progress callback and logs the progress. */
updateProgress(long bytesSent, long bytesConfirmedReceived, long totalSizeBytes)289   final void updateProgress(long bytesSent, long bytesConfirmedReceived, long totalSizeBytes) {
290     TransferProgress progress =
291         TransferProgress.create(bytesSent, bytesConfirmedReceived, totalSizeBytes);
292     progressCallback.accept(progress);
293 
294     long durationNanos = Duration.between(startTime, Instant.now()).toNanos();
295     long totalRate = durationNanos == 0 ? 0 : (bytesSent * 1_000_000_000 / durationNanos);
296 
297     logger.atFiner().log("%s progress: "
298             + "%5.1f%% (%d B sent, %d B confirmed received of %s B total) at %d B/s",
299         this,
300         progress.percentReceived(),
301         bytesSent,
302         bytesConfirmedReceived,
303         totalSizeBytes == UNKNOWN_TRANSFER_SIZE ? "unknown" : totalSizeBytes,
304         totalRate);
305   }
306 
307   interface State {
308     /**
309      * Called to handle a non-final chunk for this transfer.
310      */
handleDataChunk(VersionedChunk chunk)311     void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException;
312 
313     /**
314      * Called to handle the final chunk for this transfer.
315      */
handleFinalChunk(int statusCode)316     void handleFinalChunk(int statusCode) throws TransferAbortedException;
317 
318     /**
319      * Called when this transfer's deadline expires.
320      */
handleTimeout()321     void handleTimeout() throws TransferAbortedException;
322 
323     /**
324      * Called if the transfer is cancelled by the user.
325      */
handleCancellation()326     void handleCancellation();
327 
328     /**
329      * Called when the transfer thread is shutting down.
330      */
handleTermination()331     void handleTermination();
332   }
333 
334   /** Represents an active state in the transfer state machine. */
335   abstract class ActiveState implements State {
336     @Override
handleFinalChunk(int statusCode)337     public final void handleFinalChunk(int statusCode) throws TransferAbortedException {
338       Status status = Status.fromCode(statusCode);
339       if (status == null) {
340         logger.atWarning().log("Received invalid status value %d, using INTERNAL", statusCode);
341         status = Status.INTERNAL;
342       }
343 
344       // If this is not version 2, immediately clean up. If it is, send the
345       // COMPLETION_ACK first and
346       // clean up if that succeeded.
347       if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) {
348         sendChunk(newChunk(Chunk.Type.COMPLETION_ACK).build());
349       }
350       changeState(new Completed(status));
351     }
352 
353     /** Enters the recovery state and returns to this state if recovery succeeds. */
354     @Override
handleTimeout()355     public void handleTimeout() throws TransferAbortedException {
356       changeState(new TimeoutRecovery(this)).handleTimeout();
357     }
358 
359     @Override
handleCancellation()360     public final void handleCancellation() {
361       try {
362         setStateTerminatingAndSendFinalChunk(Status.CANCELLED);
363       } catch (TransferAbortedException e) {
364         // Transfer was aborted; nothing to do.
365       }
366     }
367 
368     @Override
handleTermination()369     public final void handleTermination() {
370       try {
371         setStateTerminatingAndSendFinalChunk(Status.ABORTED);
372       } catch (TransferAbortedException e) {
373         // Transfer was aborted; nothing to do.
374       }
375     }
376   }
377 
378   private class Initiating extends ActiveState {
379     @Override
handleDataChunk(VersionedChunk chunk)380     public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
381       if (chunk.version() == ProtocolVersion.UNKNOWN) {
382         logger.atWarning().log(
383             "%s aborting due to unsupported protocol version: %s", Transfer.this, chunk);
384         setStateTerminatingAndSendFinalChunk(Status.INVALID_ARGUMENT);
385         return;
386       }
387 
388       changeState(getWaitingForDataState());
389 
390       if (chunk.type() != Chunk.Type.START_ACK) {
391         if (offset != 0) {
392           logger.atWarning().log(
393               "%s aborting due to unsupported non-zero offset transfer: %s", Transfer.this, chunk);
394           setStateTerminatingAndSendFinalChunk(Status.INTERNAL);
395           return;
396         }
397         logger.atFine().log(
398             "%s got non-handshake chunk; reverting to legacy protocol", Transfer.this);
399         configuredProtocolVersion = ProtocolVersion.LEGACY;
400         state.handleDataChunk(chunk);
401         return;
402       }
403 
404       if (chunk.version().compareTo(desiredProtocolVersion) <= 0) {
405         configuredProtocolVersion = chunk.version();
406       } else {
407         configuredProtocolVersion = desiredProtocolVersion;
408       }
409 
410       logger.atFine().log("%s negotiated protocol %s (ours=%s, theirs=%s)",
411           Transfer.this,
412           configuredProtocolVersion,
413           desiredProtocolVersion,
414           chunk.version());
415 
416       if (offset != chunk.initialOffset()) {
417         logger.atWarning().log(
418             "%s aborting due to unconfirmed non-zero offset transfer: %s", Transfer.this, chunk);
419         setStateTerminatingAndSendFinalChunk(Status.UNIMPLEMENTED);
420         return;
421       }
422 
423       VersionedChunk.Builder startAckConfirmation = newChunk(Chunk.Type.START_ACK_CONFIRMATION);
424       prepareInitialChunk(startAckConfirmation);
425       sendChunk(startAckConfirmation.build());
426     }
427   }
428 
429   /** Recovering from an expired timeout. */
430   class TimeoutRecovery extends ActiveState {
431     private final State nextState;
432     private int retries;
433 
TimeoutRecovery(State nextState)434     TimeoutRecovery(State nextState) {
435       this.nextState = nextState;
436     }
437 
438     @Override
handleDataChunk(VersionedChunk chunk)439     public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
440       changeState(nextState).handleDataChunk(chunk);
441     }
442 
443     @Override
handleTimeout()444     public void handleTimeout() throws TransferAbortedException {
445       // If the transfer timed out, skip to the completed state. Don't send any more
446       // packets.
447       if (retries >= timeoutSettings.maxRetries()) {
448         logger.atFine().log("%s exhausted its %d retries", Transfer.this, retries);
449         changeState(new Completed(Status.DEADLINE_EXCEEDED));
450         return;
451       }
452 
453       if (lifetimeRetries >= timeoutSettings.maxLifetimeRetries()) {
454         logger.atFine().log("%s exhausted its %d lifetime retries", Transfer.this, retries);
455         changeState(new Completed(Status.DEADLINE_EXCEEDED));
456         return;
457       }
458 
459       logger.atFiner().log("%s received no chunks for %d ms; retrying %d/%d",
460           Transfer.this,
461           timeoutSettings.timeoutMillis(),
462           retries,
463           timeoutSettings.maxRetries());
464       VersionedChunk retryChunk = getChunkForRetry();
465       sendChunk(retryChunk);
466       retries += 1;
467       lifetimeRetries += 1;
468       if (retryChunk.type() == Chunk.Type.START) {
469         setInitialTimeout();
470       } else {
471         setNextChunkTimeout();
472       }
473     }
474   }
475 
476   /**
477    * Transfer completed. Do nothing if the transfer is terminated or cancelled.
478    */
479   class Terminating extends ActiveState {
480     private final Status status;
481     private int retries;
482 
Terminating(Status status)483     Terminating(Status status) {
484       this.status = status;
485     }
486 
487     @Override
handleDataChunk(VersionedChunk chunk)488     public void handleDataChunk(VersionedChunk chunk) {
489       if (chunk.type() == Chunk.Type.COMPLETION_ACK) {
490         changeState(new Completed(status));
491       }
492     }
493 
494     @Override
handleTimeout()495     public void handleTimeout() throws TransferAbortedException {
496       if (retries >= timeoutSettings.maxRetries()
497           || lifetimeRetries >= timeoutSettings.maxLifetimeRetries()) {
498         // Unlike the standard `TimeoutRecovery` state, a `Terminating` transfer should
499         // not fail due to a timeout if no completion ACK is received. It should
500         // instead complete with its existing status.
501         logger.atFine().log(
502             "%s exhausted its %d retries (lifetime %d)", Transfer.this, retries, lifetimeRetries);
503         changeState(new Completed(status));
504         return;
505       }
506 
507       logger.atFiner().log("%s did not receive completion ack for %d ms; retrying %d/%d",
508           Transfer.this,
509           timeoutSettings.timeoutMillis(),
510           retries,
511           timeoutSettings.maxRetries());
512       sendChunk(getChunkForRetry());
513       retries += 1;
514       lifetimeRetries += 1;
515       setNextChunkTimeout();
516     }
517   }
518 
519   class Completed implements State {
520     /**
521      * Performs final cleanup of a completed transfer. No packets are sent to the
522      * server.
523      */
Completed(Status status)524     Completed(Status status) {
525       cleanUp();
526       logger.atInfo().log("%s completed with status %s", Transfer.this, status);
527       if (status.ok()) {
528         setFutureResult();
529       } else {
530         setException(new TransferError(Transfer.this, status));
531       }
532     }
533 
534     /**
535      * Finishes the transfer due to an exception. No packets are sent to the server.
536      */
Completed(TransferError exception)537     Completed(TransferError exception) {
538       cleanUp();
539       logger.atWarning().withCause(exception).log("%s terminated with exception", Transfer.this);
540       setException(exception);
541     }
542 
cleanUp()543     private void cleanUp() {
544       deadline = NO_TIMEOUT;
545       eventHandler.unregisterTransfer(Transfer.this);
546     }
547 
548     @Override
handleDataChunk(VersionedChunk chunk)549     public void handleDataChunk(VersionedChunk chunk) {
550       logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this);
551     }
552 
553     @Override
handleFinalChunk(int statusCode)554     public void handleFinalChunk(int statusCode) {
555       logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this);
556     }
557 
558     @Override
handleTimeout()559     public void handleTimeout() {}
560 
561     @Override
handleTermination()562     public void handleTermination() {}
563 
564     @Override
handleCancellation()565     public void handleCancellation() {}
566   }
567 }
568