1 // Copyright 2022 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 package dev.pigweed.pw_transfer; 16 17 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 18 import static dev.pigweed.pw_transfer.TransferProgress.UNKNOWN_TRANSFER_SIZE; 19 20 import com.google.common.util.concurrent.AbstractFuture; 21 import dev.pigweed.pw_log.Logger; 22 import dev.pigweed.pw_rpc.Status; 23 import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface; 24 import java.time.Duration; 25 import java.time.Instant; 26 import java.util.Locale; 27 import java.util.function.BooleanSupplier; 28 import java.util.function.Consumer; 29 30 /** Base class for tracking the state of a read or write transfer. */ 31 abstract class Transfer<T> extends AbstractFuture<T> { 32 private static final Logger logger = Logger.forClass(Transfer.class); 33 34 // Largest nanosecond instant. Used to block indefinitely when no transfers are 35 // pending. 36 static final Instant NO_TIMEOUT = Instant.ofEpochSecond(0, Long.MAX_VALUE); 37 38 // Whether to output some particularly noisy logs. 39 static final boolean VERBOSE_LOGGING = false; 40 41 private final int resourceId; 42 private final int sessionId; 43 private int offset; 44 private final ProtocolVersion desiredProtocolVersion; 45 private final TransferEventHandler.TransferInterface eventHandler; 46 private final TransferTimeoutSettings timeoutSettings; 47 private final Consumer<TransferProgress> progressCallback; 48 private final BooleanSupplier shouldAbortCallback; 49 private final Instant startTime; 50 51 private ProtocolVersion configuredProtocolVersion = ProtocolVersion.UNKNOWN; 52 private Instant deadline = NO_TIMEOUT; 53 private State state; 54 private VersionedChunk lastChunkSent; 55 56 private int lifetimeRetries = 0; 57 58 /** 59 * Creates a new read or write transfer. 60 * 61 * @param resourceId The resource ID of the transfer 62 * @param desiredProtocolVersion protocol version to request 63 * @param eventHandler Interface to use to send a chunk. 64 * @param timeoutSettings Timeout and retry settings for this transfer. 65 * @param progressCallback Called each time a packet is sent. 66 * @param shouldAbortCallback BooleanSupplier that returns true if a transfer 67 * should be aborted. 68 */ Transfer(int resourceId, int sessionId, ProtocolVersion desiredProtocolVersion, TransferInterface eventHandler, TransferTimeoutSettings timeoutSettings, Consumer<TransferProgress> progressCallback, BooleanSupplier shouldAbortCallback, int initialOffset)69 Transfer(int resourceId, 70 int sessionId, 71 ProtocolVersion desiredProtocolVersion, 72 TransferInterface eventHandler, 73 TransferTimeoutSettings timeoutSettings, 74 Consumer<TransferProgress> progressCallback, 75 BooleanSupplier shouldAbortCallback, 76 int initialOffset) { 77 this.resourceId = resourceId; 78 this.sessionId = sessionId; 79 this.offset = initialOffset; 80 this.desiredProtocolVersion = desiredProtocolVersion; 81 this.eventHandler = eventHandler; 82 83 this.timeoutSettings = timeoutSettings; 84 this.progressCallback = progressCallback; 85 this.shouldAbortCallback = shouldAbortCallback; 86 87 // If the future is cancelled, tell the TransferEventHandler to cancel the 88 // transfer. 89 addListener(() -> { 90 if (isCancelled()) { 91 eventHandler.cancelTransfer(this); 92 } 93 }, directExecutor()); 94 95 if (desiredProtocolVersion == ProtocolVersion.LEGACY) { 96 // Legacy transfers skip protocol negotiation stage and use the resource ID as 97 // the session ID. 98 configuredProtocolVersion = ProtocolVersion.LEGACY; 99 state = getWaitingForDataState(); 100 } else { 101 state = new Initiating(); 102 } 103 104 startTime = Instant.now(); 105 } 106 107 @Override toString()108 public String toString() { 109 return String.format(Locale.ENGLISH, 110 "%s(%d:%d)[%s]", 111 this.getClass().getSimpleName(), 112 resourceId, 113 sessionId, 114 state.getClass().getSimpleName()); 115 } 116 getResourceId()117 public final int getResourceId() { 118 return resourceId; 119 } 120 getSessionId()121 public final int getSessionId() { 122 return sessionId; 123 } 124 getOffset()125 public final int getOffset() { 126 return offset; 127 } 128 getDesiredProtocolVersion()129 final ProtocolVersion getDesiredProtocolVersion() { 130 return desiredProtocolVersion; 131 } 132 133 /** Terminates the transfer without sending any packets. */ terminate(TransferError error)134 public final void terminate(TransferError error) { 135 changeState(new Completed(error)); 136 } 137 getDeadline()138 final Instant getDeadline() { 139 return deadline; 140 } 141 setOffset(int offset)142 final void setOffset(int offset) { 143 this.offset = offset; 144 } 145 setNextChunkTimeout()146 final void setNextChunkTimeout() { 147 deadline = Instant.now().plusMillis(timeoutSettings.timeoutMillis()); 148 } 149 setInitialTimeout()150 private void setInitialTimeout() { 151 deadline = Instant.now().plusMillis(timeoutSettings.initialTimeoutMillis()); 152 } 153 setTimeoutMicros(int timeoutMicros)154 final void setTimeoutMicros(int timeoutMicros) { 155 deadline = Instant.now().plusNanos((long) timeoutMicros * 1000); 156 } 157 start()158 final void start() { 159 logger.atInfo().log( 160 "%s starting with parameters: default timeout %d ms, initial timeout %d ms, %d max retires", 161 this, 162 timeoutSettings.timeoutMillis(), 163 timeoutSettings.initialTimeoutMillis(), 164 timeoutSettings.maxRetries()); 165 VersionedChunk.Builder chunk = 166 VersionedChunk.createInitialChunk(desiredProtocolVersion, resourceId, sessionId); 167 prepareInitialChunk(chunk); 168 try { 169 sendChunk(chunk.build()); 170 } catch (TransferAbortedException e) { 171 return; // Sending failed, transfer is cancelled 172 } 173 setInitialTimeout(); 174 } 175 176 /** Processes an incoming chunk from the server. */ handleChunk(VersionedChunk chunk)177 final void handleChunk(VersionedChunk chunk) { 178 try { 179 if (chunk.type() == Chunk.Type.COMPLETION) { 180 state.handleFinalChunk(chunk.status().orElseGet(() -> { 181 logger.atWarning().log("Received terminating chunk with no status set; using INTERNAL"); 182 return Status.INTERNAL.code(); 183 })); 184 } else { 185 state.handleDataChunk(chunk); 186 } 187 } catch (TransferAbortedException e) { 188 // Transfer was aborted; nothing else to do. 189 } 190 } 191 handleTimeoutIfDeadlineExceeded()192 final void handleTimeoutIfDeadlineExceeded() { 193 if (Instant.now().isAfter(deadline)) { 194 try { 195 state.handleTimeout(); 196 } catch (TransferAbortedException e) { 197 // Transfer was aborted; nothing else to do. 198 } 199 } 200 } 201 handleTermination()202 final void handleTermination() { 203 state.handleTermination(); 204 } 205 handleCancellation()206 final void handleCancellation() { 207 state.handleCancellation(); 208 } 209 210 /** Returns the State to enter immediately after sending the first packet. */ getWaitingForDataState()211 abstract State getWaitingForDataState(); 212 prepareInitialChunk(VersionedChunk.Builder chunk)213 abstract void prepareInitialChunk(VersionedChunk.Builder chunk); 214 215 /** 216 * Returns the chunk to send for a retry. Returns the initial chunk if no chunks 217 * have been sent. 218 */ getChunkForRetry()219 abstract VersionedChunk getChunkForRetry(); 220 221 /** Sets the result for the future after a successful transfer. */ setFutureResult()222 abstract void setFutureResult(); 223 newChunk(Chunk.Type type)224 final VersionedChunk.Builder newChunk(Chunk.Type type) { 225 return VersionedChunk.builder() 226 .setVersion(configuredProtocolVersion != ProtocolVersion.UNKNOWN ? configuredProtocolVersion 227 : desiredProtocolVersion) 228 .setType(type) 229 .setSessionId(sessionId) 230 .setResourceId(resourceId); 231 } 232 getLastChunkSent()233 final VersionedChunk getLastChunkSent() { 234 return lastChunkSent; 235 } 236 changeState(State newState)237 final State changeState(State newState) { 238 if (newState != state) { 239 logger.atFinest().log("%s state %s -> %s", 240 this, 241 state.getClass().getSimpleName(), 242 newState.getClass().getSimpleName()); 243 } 244 state = newState; 245 return state; 246 } 247 248 /** Exception thrown when the transfer is aborted. */ 249 static class TransferAbortedException extends Exception {} 250 251 /** 252 * Sends a chunk. 253 * 254 * If sending fails, the transfer cannot proceed. sendChunk() sets the state to 255 * completed and 256 * throws a TransferAbortedException. 257 */ sendChunk(VersionedChunk chunk)258 final void sendChunk(VersionedChunk chunk) throws TransferAbortedException { 259 lastChunkSent = chunk; 260 if (shouldAbortCallback.getAsBoolean()) { 261 logger.atWarning().log("Abort signal received."); 262 changeState(new Completed(new TransferError(this, Status.ABORTED))); 263 throw new TransferAbortedException(); 264 } 265 266 try { 267 if (VERBOSE_LOGGING) { 268 logger.atFinest().log("%s sending %s", this, chunk); 269 } 270 eventHandler.sendChunk(chunk.toMessage()); 271 } catch (TransferError transferError) { 272 changeState(new Completed(transferError)); 273 throw new TransferAbortedException(); 274 } 275 } 276 277 /** Sends a status chunk to the server and finishes the transfer. */ setStateTerminatingAndSendFinalChunk(Status status)278 final void setStateTerminatingAndSendFinalChunk(Status status) throws TransferAbortedException { 279 logger.atFine().log("%s sending final chunk with status %s", this, status); 280 sendChunk(newChunk(Chunk.Type.COMPLETION).setStatus(status).build()); 281 if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) { 282 changeState(new Terminating(status)); 283 } else { 284 changeState(new Completed(status)); 285 } 286 } 287 288 /** Invokes the transfer progress callback and logs the progress. */ updateProgress(long bytesSent, long bytesConfirmedReceived, long totalSizeBytes)289 final void updateProgress(long bytesSent, long bytesConfirmedReceived, long totalSizeBytes) { 290 TransferProgress progress = 291 TransferProgress.create(bytesSent, bytesConfirmedReceived, totalSizeBytes); 292 progressCallback.accept(progress); 293 294 long durationNanos = Duration.between(startTime, Instant.now()).toNanos(); 295 long totalRate = durationNanos == 0 ? 0 : (bytesSent * 1_000_000_000 / durationNanos); 296 297 logger.atFiner().log("%s progress: " 298 + "%5.1f%% (%d B sent, %d B confirmed received of %s B total) at %d B/s", 299 this, 300 progress.percentReceived(), 301 bytesSent, 302 bytesConfirmedReceived, 303 totalSizeBytes == UNKNOWN_TRANSFER_SIZE ? "unknown" : totalSizeBytes, 304 totalRate); 305 } 306 307 interface State { 308 /** 309 * Called to handle a non-final chunk for this transfer. 310 */ handleDataChunk(VersionedChunk chunk)311 void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException; 312 313 /** 314 * Called to handle the final chunk for this transfer. 315 */ handleFinalChunk(int statusCode)316 void handleFinalChunk(int statusCode) throws TransferAbortedException; 317 318 /** 319 * Called when this transfer's deadline expires. 320 */ handleTimeout()321 void handleTimeout() throws TransferAbortedException; 322 323 /** 324 * Called if the transfer is cancelled by the user. 325 */ handleCancellation()326 void handleCancellation(); 327 328 /** 329 * Called when the transfer thread is shutting down. 330 */ handleTermination()331 void handleTermination(); 332 } 333 334 /** Represents an active state in the transfer state machine. */ 335 abstract class ActiveState implements State { 336 @Override handleFinalChunk(int statusCode)337 public final void handleFinalChunk(int statusCode) throws TransferAbortedException { 338 Status status = Status.fromCode(statusCode); 339 if (status == null) { 340 logger.atWarning().log("Received invalid status value %d, using INTERNAL", statusCode); 341 status = Status.INTERNAL; 342 } 343 344 // If this is not version 2, immediately clean up. If it is, send the 345 // COMPLETION_ACK first and 346 // clean up if that succeeded. 347 if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) { 348 sendChunk(newChunk(Chunk.Type.COMPLETION_ACK).build()); 349 } 350 changeState(new Completed(status)); 351 } 352 353 /** Enters the recovery state and returns to this state if recovery succeeds. */ 354 @Override handleTimeout()355 public void handleTimeout() throws TransferAbortedException { 356 changeState(new TimeoutRecovery(this)).handleTimeout(); 357 } 358 359 @Override handleCancellation()360 public final void handleCancellation() { 361 try { 362 setStateTerminatingAndSendFinalChunk(Status.CANCELLED); 363 } catch (TransferAbortedException e) { 364 // Transfer was aborted; nothing to do. 365 } 366 } 367 368 @Override handleTermination()369 public final void handleTermination() { 370 try { 371 setStateTerminatingAndSendFinalChunk(Status.ABORTED); 372 } catch (TransferAbortedException e) { 373 // Transfer was aborted; nothing to do. 374 } 375 } 376 } 377 378 private class Initiating extends ActiveState { 379 @Override handleDataChunk(VersionedChunk chunk)380 public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { 381 if (chunk.version() == ProtocolVersion.UNKNOWN) { 382 logger.atWarning().log( 383 "%s aborting due to unsupported protocol version: %s", Transfer.this, chunk); 384 setStateTerminatingAndSendFinalChunk(Status.INVALID_ARGUMENT); 385 return; 386 } 387 388 changeState(getWaitingForDataState()); 389 390 if (chunk.type() != Chunk.Type.START_ACK) { 391 if (offset != 0) { 392 logger.atWarning().log( 393 "%s aborting due to unsupported non-zero offset transfer: %s", Transfer.this, chunk); 394 setStateTerminatingAndSendFinalChunk(Status.INTERNAL); 395 return; 396 } 397 logger.atFine().log( 398 "%s got non-handshake chunk; reverting to legacy protocol", Transfer.this); 399 configuredProtocolVersion = ProtocolVersion.LEGACY; 400 state.handleDataChunk(chunk); 401 return; 402 } 403 404 if (chunk.version().compareTo(desiredProtocolVersion) <= 0) { 405 configuredProtocolVersion = chunk.version(); 406 } else { 407 configuredProtocolVersion = desiredProtocolVersion; 408 } 409 410 logger.atFine().log("%s negotiated protocol %s (ours=%s, theirs=%s)", 411 Transfer.this, 412 configuredProtocolVersion, 413 desiredProtocolVersion, 414 chunk.version()); 415 416 if (offset != chunk.initialOffset()) { 417 logger.atWarning().log( 418 "%s aborting due to unconfirmed non-zero offset transfer: %s", Transfer.this, chunk); 419 setStateTerminatingAndSendFinalChunk(Status.UNIMPLEMENTED); 420 return; 421 } 422 423 VersionedChunk.Builder startAckConfirmation = newChunk(Chunk.Type.START_ACK_CONFIRMATION); 424 prepareInitialChunk(startAckConfirmation); 425 sendChunk(startAckConfirmation.build()); 426 } 427 } 428 429 /** Recovering from an expired timeout. */ 430 class TimeoutRecovery extends ActiveState { 431 private final State nextState; 432 private int retries; 433 TimeoutRecovery(State nextState)434 TimeoutRecovery(State nextState) { 435 this.nextState = nextState; 436 } 437 438 @Override handleDataChunk(VersionedChunk chunk)439 public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { 440 changeState(nextState).handleDataChunk(chunk); 441 } 442 443 @Override handleTimeout()444 public void handleTimeout() throws TransferAbortedException { 445 // If the transfer timed out, skip to the completed state. Don't send any more 446 // packets. 447 if (retries >= timeoutSettings.maxRetries()) { 448 logger.atFine().log("%s exhausted its %d retries", Transfer.this, retries); 449 changeState(new Completed(Status.DEADLINE_EXCEEDED)); 450 return; 451 } 452 453 if (lifetimeRetries >= timeoutSettings.maxLifetimeRetries()) { 454 logger.atFine().log("%s exhausted its %d lifetime retries", Transfer.this, retries); 455 changeState(new Completed(Status.DEADLINE_EXCEEDED)); 456 return; 457 } 458 459 logger.atFiner().log("%s received no chunks for %d ms; retrying %d/%d", 460 Transfer.this, 461 timeoutSettings.timeoutMillis(), 462 retries, 463 timeoutSettings.maxRetries()); 464 VersionedChunk retryChunk = getChunkForRetry(); 465 sendChunk(retryChunk); 466 retries += 1; 467 lifetimeRetries += 1; 468 if (retryChunk.type() == Chunk.Type.START) { 469 setInitialTimeout(); 470 } else { 471 setNextChunkTimeout(); 472 } 473 } 474 } 475 476 /** 477 * Transfer completed. Do nothing if the transfer is terminated or cancelled. 478 */ 479 class Terminating extends ActiveState { 480 private final Status status; 481 private int retries; 482 Terminating(Status status)483 Terminating(Status status) { 484 this.status = status; 485 } 486 487 @Override handleDataChunk(VersionedChunk chunk)488 public void handleDataChunk(VersionedChunk chunk) { 489 if (chunk.type() == Chunk.Type.COMPLETION_ACK) { 490 changeState(new Completed(status)); 491 } 492 } 493 494 @Override handleTimeout()495 public void handleTimeout() throws TransferAbortedException { 496 if (retries >= timeoutSettings.maxRetries() 497 || lifetimeRetries >= timeoutSettings.maxLifetimeRetries()) { 498 // Unlike the standard `TimeoutRecovery` state, a `Terminating` transfer should 499 // not fail due to a timeout if no completion ACK is received. It should 500 // instead complete with its existing status. 501 logger.atFine().log( 502 "%s exhausted its %d retries (lifetime %d)", Transfer.this, retries, lifetimeRetries); 503 changeState(new Completed(status)); 504 return; 505 } 506 507 logger.atFiner().log("%s did not receive completion ack for %d ms; retrying %d/%d", 508 Transfer.this, 509 timeoutSettings.timeoutMillis(), 510 retries, 511 timeoutSettings.maxRetries()); 512 sendChunk(getChunkForRetry()); 513 retries += 1; 514 lifetimeRetries += 1; 515 setNextChunkTimeout(); 516 } 517 } 518 519 class Completed implements State { 520 /** 521 * Performs final cleanup of a completed transfer. No packets are sent to the 522 * server. 523 */ Completed(Status status)524 Completed(Status status) { 525 cleanUp(); 526 logger.atInfo().log("%s completed with status %s", Transfer.this, status); 527 if (status.ok()) { 528 setFutureResult(); 529 } else { 530 setException(new TransferError(Transfer.this, status)); 531 } 532 } 533 534 /** 535 * Finishes the transfer due to an exception. No packets are sent to the server. 536 */ Completed(TransferError exception)537 Completed(TransferError exception) { 538 cleanUp(); 539 logger.atWarning().withCause(exception).log("%s terminated with exception", Transfer.this); 540 setException(exception); 541 } 542 cleanUp()543 private void cleanUp() { 544 deadline = NO_TIMEOUT; 545 eventHandler.unregisterTransfer(Transfer.this); 546 } 547 548 @Override handleDataChunk(VersionedChunk chunk)549 public void handleDataChunk(VersionedChunk chunk) { 550 logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this); 551 } 552 553 @Override handleFinalChunk(int statusCode)554 public void handleFinalChunk(int statusCode) { 555 logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this); 556 } 557 558 @Override handleTimeout()559 public void handleTimeout() {} 560 561 @Override handleTermination()562 public void handleTermination() {} 563 564 @Override handleCancellation()565 public void handleCancellation() {} 566 } 567 } 568