1# Copyright 2024 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Classes for read and write transfers.""" 15 16import abc 17import asyncio 18from dataclasses import dataclass 19import enum 20import logging 21import math 22import threading 23from typing import Any, Callable 24 25from pw_status import Status 26from pw_transfer.chunk import Chunk, ProtocolVersion 27 28_LOG = logging.getLogger(__package__) 29 30 31@dataclass(frozen=True) 32class ProgressStats: 33 bytes_sent: int 34 bytes_confirmed_received: int 35 total_size_bytes: int | None 36 37 def percent_received(self) -> float: 38 if self.total_size_bytes is None or self.total_size_bytes == 0: 39 return math.nan 40 41 return self.bytes_confirmed_received / self.total_size_bytes * 100 42 43 def __str__(self) -> str: 44 total = ( 45 str(self.total_size_bytes) if self.total_size_bytes else 'unknown' 46 ) 47 return ( 48 f'{self.percent_received():5.1f}% ({self.bytes_sent} B sent, ' 49 f'{self.bytes_confirmed_received} B received of {total} B)' 50 ) 51 52 53ProgressCallback = Callable[[ProgressStats], Any] 54 55 56class _Timer: 57 """A timer which invokes a callback after a certain timeout.""" 58 59 def __init__(self, timeout_s: float, callback: Callable[[], Any]): 60 self.timeout_s = timeout_s 61 self._callback = callback 62 self._task: asyncio.Task[Any] | None = None 63 64 def start(self, timeout_s: float | None = None) -> None: 65 """Starts a new timer. 66 67 If a timer is already running, it is stopped and a new timer started. 68 This can be used to implement watchdog-like behavior, where a callback 69 is invoked after some time without a kick. 70 """ 71 self.stop() 72 timeout_s = self.timeout_s if timeout_s is None else timeout_s 73 self._task = asyncio.create_task(self._run(timeout_s)) 74 75 def stop(self) -> None: 76 """Terminates a running timer.""" 77 if self._task is not None: 78 self._task.cancel() 79 self._task = None 80 81 async def _run(self, timeout_s: float) -> None: 82 await asyncio.sleep(timeout_s) 83 self._task = None 84 self._callback() 85 86 87class Transfer(abc.ABC): 88 """A client-side data transfer through a Manager. 89 90 Subclasses are responsible for implementing all of the logic for their type 91 of transfer, receiving messages from the server and sending the appropriate 92 messages in response. 93 """ 94 95 # pylint: disable=too-many-instance-attributes 96 97 class _State(enum.Enum): 98 # Transfer is starting. The server and client are performing an initial 99 # handshake and negotiating protocol and feature flags. 100 INITIATING = 0 101 102 # Waiting for the other end to send a chunk. 103 WAITING = 1 104 105 # Transmitting a window of data to a receiver. 106 TRANSMITTING = 2 107 108 # Recovering after one or more chunks was dropped in an active transfer. 109 RECOVERY = 3 110 111 # Transfer has completed locally and is waiting for the peer to 112 # acknowledge its final status. Only entered by the terminating side of 113 # the transfer. 114 # 115 # The context remains in a TERMINATING state until it receives an 116 # acknowledgement from the peer or times out. 117 TERMINATING = 4 118 119 # A transfer has fully completed. 120 COMPLETE = 5 121 122 def __init__( # pylint: disable=too-many-arguments 123 self, 124 session_id: int, 125 resource_id: int, 126 send_chunk: Callable[[Chunk], None], 127 end_transfer: Callable[['Transfer'], None], 128 response_timeout_s: float, 129 initial_response_timeout_s: float, 130 max_retries: int, 131 max_lifetime_retries: int, 132 protocol_version: ProtocolVersion, 133 progress_callback: ProgressCallback | None = None, 134 initial_offset: int = 0, 135 ): 136 self.status = Status.OK 137 self.done = threading.Event() 138 139 self._session_id = session_id 140 self._resource_id = resource_id 141 self._offset = initial_offset 142 143 self._send_chunk_fn = send_chunk 144 self._end_transfer = end_transfer 145 146 self._desired_protocol_version = protocol_version 147 self._configured_protocol_version = ProtocolVersion.UNKNOWN 148 149 if self._desired_protocol_version is ProtocolVersion.LEGACY: 150 # In a legacy transfer, there is no protocol negotiation stage. 151 # Automatically configure the context to run the legacy protocol and 152 # proceed to waiting for a chunk. 153 self._configured_protocol_version = ProtocolVersion.LEGACY 154 self._state = Transfer._State.WAITING 155 self._session_id = self._resource_id 156 else: 157 self._state = Transfer._State.INITIATING 158 159 self._last_chunk: Chunk | None = None 160 161 self._retries = 0 162 self._max_retries = max_retries 163 self._lifetime_retries = 0 164 self._max_lifetime_retries = max_lifetime_retries 165 self._response_timer = _Timer(response_timeout_s, self._on_timeout) 166 self._initial_response_timeout_s = initial_response_timeout_s 167 168 self._progress_callback = progress_callback 169 170 async def begin(self) -> None: 171 """Sends the initial chunk of the transfer.""" 172 173 if ( 174 self._desired_protocol_version is ProtocolVersion.UNKNOWN 175 or self._desired_protocol_version.value 176 > ProtocolVersion.LATEST.value 177 ): 178 _LOG.error( 179 'Cannot start a transfer with unsupported protocol version %d', 180 self._desired_protocol_version.value, 181 ) 182 self.finish(Status.INVALID_ARGUMENT) 183 return 184 185 initial_chunk = Chunk( 186 self._desired_protocol_version, 187 Chunk.Type.START, 188 resource_id=self._resource_id, 189 ) 190 191 if self._offset != 0: 192 initial_chunk.initial_offset = self._offset 193 194 if self._desired_protocol_version is ProtocolVersion.VERSION_TWO: 195 initial_chunk.desired_session_id = self._session_id 196 197 # Regardless of the desired protocol version, set any additional fields 198 # on the opening chunk, in case the server only runs legacy. 199 self._set_initial_chunk_fields(initial_chunk) 200 201 self._send_chunk(initial_chunk) 202 self._response_timer.start(self._initial_response_timeout_s) 203 204 @property 205 def id(self) -> int: 206 """Returns the identifier for the active transfer.""" 207 return self._session_id 208 209 @property 210 def resource_id(self) -> int: 211 """Returns the identifier of the resource being transferred.""" 212 return self._resource_id 213 214 @property 215 @abc.abstractmethod 216 def data(self) -> bytes: 217 """Returns the data read or written in this transfer.""" 218 219 @abc.abstractmethod 220 def _set_initial_chunk_fields(self, chunk: Chunk) -> None: 221 """Sets fields for the initial non-handshake chunk of the transfer.""" 222 223 def _send_chunk(self, chunk: Chunk) -> None: 224 """Sends a chunk to the server, keeping track of the last chunk sent.""" 225 self._send_chunk_fn(chunk) 226 self._last_chunk = chunk 227 228 async def handle_chunk(self, chunk: Chunk) -> None: 229 """Processes an incoming chunk from the server. 230 231 Handles terminating chunks (i.e. those with a status) and forwards 232 non-terminating chunks to handle_data_chunk. 233 """ 234 self._response_timer.stop() 235 self._retries = 0 # Received data from service, so reset the retries. 236 237 _LOG.debug('Received chunk\n%s', str(chunk.to_message()).rstrip()) 238 239 # Status chunks are only used to terminate a transfer. They do not 240 # contain any data that requires processing. 241 if chunk.status is not None: 242 if self._configured_protocol_version is ProtocolVersion.VERSION_TWO: 243 self._send_chunk( 244 Chunk( 245 self._configured_protocol_version, 246 Chunk.Type.COMPLETION_ACK, 247 session_id=self._session_id, 248 ) 249 ) 250 251 self.finish(Status(chunk.status)) 252 return 253 254 if self._state is Transfer._State.INITIATING: 255 await self._perform_initial_handshake(chunk) 256 elif self._state is Transfer._State.TERMINATING: 257 if chunk.type is Chunk.Type.COMPLETION_ACK: 258 self.finish(self.status) 259 else: 260 # Expecting a completion ACK but didn't receive one. Go through 261 # the retry process. 262 self._on_timeout() 263 # Only ignoring START_ACK, tests were unhappy with other non-data chunks 264 elif chunk.type not in [Chunk.Type.START_ACK]: 265 await self._handle_data_chunk(chunk) 266 else: 267 _LOG.warning("Ignoring extra START_ACK chunk") 268 return 269 270 # Start the timeout for the server to send a chunk in response. 271 self._response_timer.start() 272 273 async def _perform_initial_handshake(self, chunk: Chunk) -> None: 274 """Progresses the initial handshake phase of a v2+ transfer.""" 275 assert self._state is Transfer._State.INITIATING 276 277 # If a non-handshake chunk is received during an INITIATING state, the 278 # transfer server is running a legacy protocol version, which does not 279 # perform a handshake. End the handshake, revert to the legacy protocol, 280 # and process the chunk appropriately. 281 if chunk.type is not Chunk.Type.START_ACK: 282 _LOG.debug( 283 'Transfer %d got non-handshake chunk, reverting to legacy', 284 self.id, 285 ) 286 287 if self._offset != 0: 288 _LOG.error( 289 'Non-zero offset transfers not supported by legacy protocol' 290 ) 291 self.finish(Status.INTERNAL) 292 return 293 294 self._configured_protocol_version = ProtocolVersion.LEGACY 295 self._state = Transfer._State.WAITING 296 297 # Update the transfer's session ID, which will map to the 298 # transfer_id of the legacy chunk. 299 self._session_id = chunk.session_id 300 301 await self._handle_data_chunk(chunk) 302 return 303 304 self._configured_protocol_version = ProtocolVersion( 305 min( 306 self._desired_protocol_version.value, 307 chunk.protocol_version.value, 308 ) 309 ) 310 _LOG.debug( 311 'Transfer %d negotiating protocol version: ours=%d, theirs=%d', 312 self.id, 313 self._desired_protocol_version.value, 314 chunk.protocol_version.value, 315 ) 316 317 if self._offset != chunk.initial_offset: 318 # If our offsets don't match, let user handle it 319 self.finish(Status.UNIMPLEMENTED) 320 return 321 322 # Send a confirmation chunk to the server accepting the assigned session 323 # ID and protocol version. Tag any initial transfer parameters onto the 324 # chunk to begin the data transfer. 325 start_ack_confirmation = Chunk( 326 self._configured_protocol_version, 327 Chunk.Type.START_ACK_CONFIRMATION, 328 session_id=self._session_id, 329 offset=self._offset, 330 ) 331 332 self._set_initial_chunk_fields(start_ack_confirmation) 333 334 self._state = Transfer._State.WAITING 335 self._send_chunk(start_ack_confirmation) 336 337 @abc.abstractmethod 338 async def _handle_data_chunk(self, chunk: Chunk) -> None: 339 """Handles a chunk that contains or requests data.""" 340 341 @abc.abstractmethod 342 def _retry_after_data_timeout(self) -> None: 343 """Retries after a timeout occurs during the data transfer phase. 344 345 Only invoked when in the data transfer phase (i.e. state is in 346 {WAITING, TRANSMITTING, RECOVERY}). Timeouts occurring during an 347 opening or closing handshake are handled by the base Transfer. 348 """ 349 350 def _on_timeout(self) -> None: 351 """Handles a timeout while waiting for a chunk.""" 352 if self._state is Transfer._State.COMPLETE: 353 return 354 355 self._retries += 1 356 self._lifetime_retries += 1 357 358 if ( 359 self._retries > self._max_retries 360 or self._lifetime_retries > self._max_lifetime_retries 361 ): 362 if self._state is Transfer._State.TERMINATING: 363 # If the server never responded to the sent completion chunk, 364 # simply end the transfer locally with its original status. 365 self.finish(self.status) 366 else: 367 self.finish(Status.DEADLINE_EXCEEDED) 368 return 369 370 _LOG.debug( 371 'Received no responses for %.3fs; retrying %d/%d', 372 self._response_timer.timeout_s, 373 self._retries, 374 self._max_retries, 375 ) 376 377 retry_handshake_chunk = self._state in ( 378 Transfer._State.INITIATING, 379 Transfer._State.TERMINATING, 380 ) or ( 381 self._last_chunk is not None 382 and self._last_chunk.type is Chunk.Type.START_ACK_CONFIRMATION 383 ) 384 385 if retry_handshake_chunk: 386 assert self._last_chunk is not None 387 self._send_chunk(self._last_chunk) 388 else: 389 self._retry_after_data_timeout() 390 391 if ( 392 self._last_chunk is not None 393 and self._last_chunk.type is Chunk.Type.START 394 ): 395 self._response_timer.start(self._initial_response_timeout_s) 396 else: 397 self._response_timer.start() 398 399 def finish(self, status: Status, skip_callback: bool = False) -> None: 400 """Ends the transfer with the specified status.""" 401 self._response_timer.stop() 402 self.status = status 403 404 if status.ok(): 405 total_size = len(self.data) 406 self._update_progress(total_size, total_size, total_size) 407 408 if not skip_callback: 409 self._end_transfer(self) 410 411 # Set done last so that the transfer has been fully cleaned up. 412 self._state = Transfer._State.COMPLETE 413 self.done.set() 414 415 def _update_progress( 416 self, 417 bytes_sent: int, 418 bytes_confirmed_received: int, 419 total_size_bytes: int | None, 420 ) -> None: 421 """Invokes the provided progress callback, if any, with the progress.""" 422 423 stats = ProgressStats( 424 bytes_sent, bytes_confirmed_received, total_size_bytes 425 ) 426 _LOG.debug('Transfer %d progress: %s', self.id, stats) 427 428 if self._progress_callback: 429 self._progress_callback(stats) 430 431 def _send_final_chunk(self, status: Status) -> None: 432 """Sends a status chunk to the server and finishes the transfer.""" 433 self._send_chunk( 434 Chunk( 435 self._configured_protocol_version, 436 Chunk.Type.COMPLETION, 437 session_id=self.id, 438 status=status, 439 ) 440 ) 441 442 if self._configured_protocol_version is ProtocolVersion.VERSION_TWO: 443 # Wait for a completion ACK from the server. 444 self.status = status 445 self._state = Transfer._State.TERMINATING 446 self._response_timer.start() 447 else: 448 self.finish(status) 449 450 451class WriteTransfer(Transfer): 452 """A client -> server write transfer.""" 453 454 def __init__( # pylint: disable=too-many-arguments 455 self, 456 session_id: int, 457 resource_id: int, 458 data: bytes, 459 send_chunk: Callable[[Chunk], None], 460 end_transfer: Callable[[Transfer], None], 461 response_timeout_s: float, 462 initial_response_timeout_s: float, 463 max_retries: int, 464 max_lifetime_retries: int, 465 protocol_version: ProtocolVersion, 466 progress_callback: ProgressCallback | None = None, 467 initial_offset: int = 0, 468 ): 469 super().__init__( 470 session_id, 471 resource_id, 472 send_chunk, 473 end_transfer, 474 response_timeout_s, 475 initial_response_timeout_s, 476 max_retries, 477 max_lifetime_retries, 478 protocol_version, 479 progress_callback, 480 initial_offset=initial_offset, 481 ) 482 self._data = data 483 self.initial_offset = initial_offset 484 485 self._window_end_offset = 0 486 self._max_chunk_size = 0 487 self._chunk_delay_us: int | None = None 488 489 # The window ID increments for each parameters update. 490 self._window_id = 0 491 492 self._bytes_confirmed_received = 0 493 494 @property 495 def data(self) -> bytes: 496 return self._data 497 498 def _set_initial_chunk_fields(self, chunk: Chunk) -> None: 499 # Nothing to tag onto the initial chunk in a write transfer. 500 pass 501 502 async def _handle_data_chunk(self, chunk: Chunk) -> None: 503 """Processes an incoming chunk from the server. 504 505 In a write transfer, the server only sends transfer parameter updates 506 to the client. When a message is received, update local parameters and 507 send data accordingly. 508 """ 509 510 if self._state is Transfer._State.TRANSMITTING: 511 self._state = Transfer._State.WAITING 512 513 assert self._state is Transfer._State.WAITING 514 515 if not self._handle_parameters_update(chunk): 516 return 517 518 self._bytes_confirmed_received = chunk.offset 519 self._state = Transfer._State.TRANSMITTING 520 521 self._window_id += 1 522 asyncio.create_task(self._transmit_next_chunk(self._window_id)) 523 524 async def _transmit_next_chunk( 525 self, window_id: int, timeout_us: int | None = None 526 ) -> None: 527 """Transmits a single data chunk to the server. 528 529 If the chunk completes the active window, returns to a WAITING state. 530 Otherwise, schedules another transmission for the next chunk. 531 """ 532 if timeout_us is not None: 533 await asyncio.sleep(timeout_us / 1e6) 534 535 if self._state is not Transfer._State.TRANSMITTING: 536 return 537 538 if window_id != self._window_id: 539 _LOG.debug('Transfer %d: Skipping stale window', self.id) 540 return 541 542 chunk = self._next_chunk() 543 self._offset += len(chunk.data) 544 545 sent_requested_bytes = self._offset == self._window_end_offset 546 547 self._send_chunk(chunk) 548 self._update_progress( 549 self._offset, 550 self._bytes_confirmed_received, 551 len(self.data) + self.initial_offset, 552 ) 553 554 if sent_requested_bytes: 555 self._state = Transfer._State.WAITING 556 else: 557 asyncio.create_task( 558 self._transmit_next_chunk( 559 window_id, timeout_us=self._chunk_delay_us 560 ) 561 ) 562 563 def _handle_parameters_update(self, chunk: Chunk) -> bool: 564 """Updates transfer state based on a transfer parameters update.""" 565 566 if chunk.offset > len(self.data) + self.initial_offset: 567 # Bad offset; terminate the transfer. 568 _LOG.error( 569 'Transfer %d: server requested invalid offset %d (size %d)', 570 self.id, 571 chunk.offset, 572 len(self.data) + self.initial_offset, 573 ) 574 575 self._send_final_chunk(Status.OUT_OF_RANGE) 576 return False 577 578 if chunk.offset == chunk.window_end_offset: 579 _LOG.error( 580 'Transfer %d: service requested 0 bytes (invalid); aborting', 581 self.id, 582 ) 583 self._send_final_chunk(Status.INTERNAL) 584 return False 585 586 # Extend the window to the new end offset specified by the server. 587 self._window_end_offset = min( 588 chunk.window_end_offset, len(self.data) + self.initial_offset 589 ) 590 591 if chunk.requests_transmission_from_offset(): 592 # Check whether the client has sent a previous data offset, which 593 # indicates that some chunks were lost in transmission. 594 if chunk.offset < self._offset: 595 _LOG.debug( 596 'Write transfer %d rolling back: offset %d from %d', 597 self.id, 598 chunk.offset, 599 self._offset, 600 ) 601 602 self._offset = chunk.offset 603 604 if chunk.max_chunk_size_bytes is not None: 605 self._max_chunk_size = chunk.max_chunk_size_bytes 606 607 if chunk.min_delay_microseconds is not None: 608 self._chunk_delay_us = chunk.min_delay_microseconds 609 610 return True 611 612 def _retry_after_data_timeout(self) -> None: 613 if ( 614 self._state is Transfer._State.WAITING 615 and self._last_chunk is not None 616 ): 617 self._send_chunk(self._last_chunk) 618 619 def _next_chunk(self) -> Chunk: 620 """Returns the next Chunk message to send in the data transfer.""" 621 chunk = Chunk( 622 self._configured_protocol_version, 623 Chunk.Type.DATA, 624 session_id=self.id, 625 offset=self._offset, 626 ) 627 628 max_bytes_in_chunk = min( 629 self._max_chunk_size, self._window_end_offset - self._offset 630 ) 631 chunk.data = self.data[ 632 self._offset 633 - self.initial_offset : self._offset 634 - self.initial_offset 635 + max_bytes_in_chunk 636 ] 637 638 # Mark the final chunk of the transfer. 639 if ( 640 len(self.data) - self._offset + self.initial_offset 641 <= max_bytes_in_chunk 642 ): 643 chunk.remaining_bytes = 0 644 645 return chunk 646 647 648class ReadTransfer(Transfer): 649 """A client <- server read transfer. 650 651 Although Python can effectively handle an unlimited transfer window, this 652 client sets a conservative window and chunk size to avoid overloading the 653 device. These are configurable in the constructor. 654 """ 655 656 # pylint: disable=too-many-instance-attributes 657 658 # The fractional position within a window at which a receive transfer should 659 # extend its window size to minimize the amount of time the transmitter 660 # spends blocked. 661 # 662 # For example, a divisor of 2 will extend the window when half of the 663 # requested data has been received, a divisor of three will extend at a 664 # third of the window, and so on. 665 EXTEND_WINDOW_DIVISOR = 2 666 667 # Slow start and congestion avoidance are analogues to the equally named 668 # phases in TCP congestion control. 669 class _TransmitPhase(enum.Enum): 670 SLOW_START = 0 671 CONGESTION_AVOIDANCE = 1 672 673 # The type of data transmission the transfer is requesting. 674 class _TransmitAction(enum.Enum): 675 # Immediate parameters sent at the start of a new transfer for legacy 676 # compatibility. 677 BEGIN = 0 678 679 # Initial parameters chunk following the opening handshake. 680 FIRST_PARAMETERS = 1 681 682 # Extend the current transmission window. 683 EXTEND = 2 684 685 # Rewind the transfer to a certain offset following data loss. 686 RETRANSMIT = 3 687 688 def __init__( # pylint: disable=too-many-arguments 689 self, 690 session_id: int, 691 resource_id: int, 692 send_chunk: Callable[[Chunk], None], 693 end_transfer: Callable[[Transfer], None], 694 response_timeout_s: float, 695 initial_response_timeout_s: float, 696 max_retries: int, 697 max_lifetime_retries: int, 698 protocol_version: ProtocolVersion, 699 max_window_size_bytes: int = 32768, 700 max_chunk_size: int = 1024, 701 chunk_delay_us: int | None = None, 702 progress_callback: ProgressCallback | None = None, 703 initial_offset: int = 0, 704 ): 705 super().__init__( 706 session_id, 707 resource_id, 708 send_chunk, 709 end_transfer, 710 response_timeout_s, 711 initial_response_timeout_s, 712 max_retries, 713 max_lifetime_retries, 714 protocol_version, 715 progress_callback, 716 initial_offset=initial_offset, 717 ) 718 self._max_window_size_bytes = max_window_size_bytes 719 self._max_chunk_size = max_chunk_size 720 self._chunk_delay_us = chunk_delay_us 721 722 self._remaining_transfer_size: int | None = None 723 self._data = bytearray() 724 self._window_end_offset = max_chunk_size 725 self._window_size_multiplier = 1 726 self._window_size = self._max_chunk_size * self._window_size_multiplier 727 self._transmit_phase = ReadTransfer._TransmitPhase.SLOW_START 728 self._last_chunk_offset: int | None = None 729 730 @property 731 def data(self) -> bytes: 732 """Returns an immutable copy of the data that has been read.""" 733 return bytes(self._data) 734 735 def _set_initial_chunk_fields(self, chunk: Chunk) -> None: 736 self._update_and_set_transfer_parameters( 737 chunk, ReadTransfer._TransmitAction.BEGIN 738 ) 739 740 async def _handle_data_chunk(self, chunk: Chunk) -> None: 741 """Processes an incoming chunk from the server. 742 743 In a read transfer, the client receives data chunks from the server. 744 Once all pending data is received, the transfer parameters are updated. 745 """ 746 747 if self._state is Transfer._State.RECOVERY: 748 if chunk.offset != self._offset: 749 if self._last_chunk_offset == chunk.offset: 750 _LOG.debug( 751 'Transfer %d received repeated offset %d: ' 752 'retry detected, resending transfer parameters', 753 self.id, 754 chunk.offset, 755 ) 756 self._send_chunk( 757 self._transfer_parameters( 758 ReadTransfer._TransmitAction.RETRANSMIT 759 ) 760 ) 761 else: 762 _LOG.debug( 763 'Transfer %d waiting for offset %d, ignoring %d', 764 self.id, 765 self._offset, 766 chunk.offset, 767 ) 768 self._last_chunk_offset = chunk.offset 769 return 770 771 _LOG.info( 772 'Transfer %d received expected offset %d, resuming transfer', 773 self.id, 774 chunk.offset, 775 ) 776 self._state = Transfer._State.WAITING 777 778 assert self._state is Transfer._State.WAITING 779 780 if chunk.offset != self._offset: 781 if chunk.offset + len(chunk.data) <= self._offset: 782 # If the chunk's data has already been received, don't go 783 # through a full recovery cycle to avoid shrinking the window 784 # size and potentially thrashing. The expected data may already 785 # be in-flight, so just allow the transmitter to keep going with 786 # a CONTINUE parameters chunk. 787 _LOG.debug( 788 'Transfer %d received duplicate chunk with offset %d', 789 self.id, 790 chunk.offset, 791 ) 792 self._send_chunk( 793 self._transfer_parameters( 794 ReadTransfer._TransmitAction.EXTEND, 795 update=False, 796 ) 797 ) 798 else: 799 # Initially, the transfer service only supports in-order 800 # transfers. If data is received out of order, request that the 801 # server retransmit from the previous offset. 802 _LOG.debug( 803 'Transfer %d expected offset %d, received %d: ' 804 'entering recovery state', 805 self.id, 806 self._offset, 807 chunk.offset, 808 ) 809 self._state = Transfer._State.RECOVERY 810 811 self._send_chunk( 812 self._transfer_parameters( 813 ReadTransfer._TransmitAction.RETRANSMIT 814 ) 815 ) 816 return 817 818 self._data += chunk.data 819 self._offset += len(chunk.data) 820 821 # Update the last offset seen so that retries can be detected. 822 self._last_chunk_offset = chunk.offset 823 824 if chunk.remaining_bytes is not None: 825 if chunk.remaining_bytes == 0: 826 # No more data to read. Acknowledge receipt and finish. 827 self._send_final_chunk(Status.OK) 828 return 829 830 # The server may indicate if the amount of remaining data is known. 831 self._remaining_transfer_size = chunk.remaining_bytes 832 elif self._remaining_transfer_size is not None: 833 # Update the remaining transfer size, if it is known. 834 self._remaining_transfer_size -= len(chunk.data) 835 836 # If the transfer size drops to zero, the estimate was inaccurate. 837 if self._remaining_transfer_size <= 0: 838 self._remaining_transfer_size = None 839 840 total_size = ( 841 None 842 if self._remaining_transfer_size is None 843 else (self._remaining_transfer_size + self._offset) 844 ) 845 self._update_progress(self._offset, self._offset, total_size) 846 847 if chunk.window_end_offset != 0: 848 if chunk.window_end_offset < self._offset: 849 _LOG.error( 850 'Transfer %d: transmitter sent invalid earlier end offset ' 851 '%d (receiver offset %d)', 852 self.id, 853 chunk.window_end_offset, 854 self._offset, 855 ) 856 self._send_final_chunk(Status.INTERNAL) 857 return 858 859 if chunk.window_end_offset > self._window_end_offset: 860 _LOG.error( 861 'Transfer %d: transmitter sent invalid later end offset ' 862 '%d (receiver end offset %d)', 863 self.id, 864 chunk.window_end_offset, 865 self._window_end_offset, 866 ) 867 self._send_final_chunk(Status.INTERNAL) 868 return 869 870 self._window_end_offset = chunk.window_end_offset 871 872 if self._offset == self._window_end_offset: 873 # All pending data was received. Send out a new parameters chunk for 874 # the next block. 875 self._send_chunk( 876 self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND) 877 ) 878 return 879 880 remaining_window_size = self._window_end_offset - self._offset 881 extend_window = ( 882 remaining_window_size 883 <= self._window_size / ReadTransfer.EXTEND_WINDOW_DIVISOR 884 ) 885 886 if extend_window: 887 self._send_chunk( 888 self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND) 889 ) 890 891 def _retry_after_data_timeout(self) -> None: 892 if ( 893 self._state is Transfer._State.WAITING 894 or self._state is Transfer._State.RECOVERY 895 ): 896 self._send_chunk( 897 self._transfer_parameters( 898 ReadTransfer._TransmitAction.RETRANSMIT 899 ) 900 ) 901 902 def _set_transfer_parameters( 903 self, 904 chunk: Chunk, 905 ) -> None: 906 chunk.offset = self._offset 907 chunk.window_end_offset = self._window_end_offset 908 chunk.max_chunk_size_bytes = self._max_chunk_size 909 910 if self._chunk_delay_us: 911 chunk.min_delay_microseconds = self._chunk_delay_us 912 913 def _update_and_set_transfer_parameters( 914 self, chunk: Chunk, action: 'ReadTransfer._TransmitAction' 915 ) -> None: 916 if action is ReadTransfer._TransmitAction.EXTEND: 917 # Window was received successfully without packet loss and should 918 # grow. Double the window size during slow start, or increase it by 919 # a single chunk in congestion avoidance. 920 if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START: 921 self._window_size_multiplier *= 2 922 else: 923 self._window_size_multiplier += 1 924 925 # The window size can never exceed the user-specified maximum bytes. 926 # If it does, reduce the multiplier to the largest size that fits. 927 if ( 928 self._window_size_multiplier * self._max_chunk_size 929 > self._max_window_size_bytes 930 ): 931 self._window_size_multiplier = ( 932 self._max_window_size_bytes // self._max_chunk_size 933 ) 934 935 elif action is ReadTransfer._TransmitAction.RETRANSMIT: 936 # A packet was lost: shrink the window size. Additionally, after the 937 # first packet loss, transition from the slow start to the 938 # congestion avoidance phase of the transfer. 939 if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START: 940 self._transmit_phase = ( 941 ReadTransfer._TransmitPhase.CONGESTION_AVOIDANCE 942 ) 943 self._window_size_multiplier = max( 944 self._window_size_multiplier // 2, 1 945 ) 946 947 self._window_size = min( 948 self._max_chunk_size * self._window_size_multiplier, 949 self._max_window_size_bytes, 950 ) 951 952 self._window_end_offset = self._offset + self._window_size 953 self._set_transfer_parameters(chunk) 954 955 def _transfer_parameters( 956 self, 957 action: 'ReadTransfer._TransmitAction', 958 update: bool = True, 959 ) -> Chunk: 960 """Returns an updated transfer parameters chunk.""" 961 962 if action is ReadTransfer._TransmitAction.BEGIN: 963 chunk_type = Chunk.Type.START 964 elif action is ReadTransfer._TransmitAction.EXTEND: 965 chunk_type = Chunk.Type.PARAMETERS_CONTINUE 966 else: 967 chunk_type = Chunk.Type.PARAMETERS_RETRANSMIT 968 969 chunk = Chunk( 970 self._configured_protocol_version, chunk_type, session_id=self.id 971 ) 972 973 if update: 974 self._update_and_set_transfer_parameters(chunk, action) 975 else: 976 self._set_transfer_parameters(chunk) 977 978 return chunk 979