xref: /aosp_15_r20/external/pigweed/pw_transfer/py/pw_transfer/transfer.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2024 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Classes for read and write transfers."""
15
16import abc
17import asyncio
18from dataclasses import dataclass
19import enum
20import logging
21import math
22import threading
23from typing import Any, Callable
24
25from pw_status import Status
26from pw_transfer.chunk import Chunk, ProtocolVersion
27
28_LOG = logging.getLogger(__package__)
29
30
31@dataclass(frozen=True)
32class ProgressStats:
33    bytes_sent: int
34    bytes_confirmed_received: int
35    total_size_bytes: int | None
36
37    def percent_received(self) -> float:
38        if self.total_size_bytes is None or self.total_size_bytes == 0:
39            return math.nan
40
41        return self.bytes_confirmed_received / self.total_size_bytes * 100
42
43    def __str__(self) -> str:
44        total = (
45            str(self.total_size_bytes) if self.total_size_bytes else 'unknown'
46        )
47        return (
48            f'{self.percent_received():5.1f}% ({self.bytes_sent} B sent, '
49            f'{self.bytes_confirmed_received} B received of {total} B)'
50        )
51
52
53ProgressCallback = Callable[[ProgressStats], Any]
54
55
56class _Timer:
57    """A timer which invokes a callback after a certain timeout."""
58
59    def __init__(self, timeout_s: float, callback: Callable[[], Any]):
60        self.timeout_s = timeout_s
61        self._callback = callback
62        self._task: asyncio.Task[Any] | None = None
63
64    def start(self, timeout_s: float | None = None) -> None:
65        """Starts a new timer.
66
67        If a timer is already running, it is stopped and a new timer started.
68        This can be used to implement watchdog-like behavior, where a callback
69        is invoked after some time without a kick.
70        """
71        self.stop()
72        timeout_s = self.timeout_s if timeout_s is None else timeout_s
73        self._task = asyncio.create_task(self._run(timeout_s))
74
75    def stop(self) -> None:
76        """Terminates a running timer."""
77        if self._task is not None:
78            self._task.cancel()
79            self._task = None
80
81    async def _run(self, timeout_s: float) -> None:
82        await asyncio.sleep(timeout_s)
83        self._task = None
84        self._callback()
85
86
87class Transfer(abc.ABC):
88    """A client-side data transfer through a Manager.
89
90    Subclasses are responsible for implementing all of the logic for their type
91    of transfer, receiving messages from the server and sending the appropriate
92    messages in response.
93    """
94
95    # pylint: disable=too-many-instance-attributes
96
97    class _State(enum.Enum):
98        # Transfer is starting. The server and client are performing an initial
99        # handshake and negotiating protocol and feature flags.
100        INITIATING = 0
101
102        # Waiting for the other end to send a chunk.
103        WAITING = 1
104
105        # Transmitting a window of data to a receiver.
106        TRANSMITTING = 2
107
108        # Recovering after one or more chunks was dropped in an active transfer.
109        RECOVERY = 3
110
111        # Transfer has completed locally and is waiting for the peer to
112        # acknowledge its final status. Only entered by the terminating side of
113        # the transfer.
114        #
115        # The context remains in a TERMINATING state until it receives an
116        # acknowledgement from the peer or times out.
117        TERMINATING = 4
118
119        # A transfer has fully completed.
120        COMPLETE = 5
121
122    def __init__(  # pylint: disable=too-many-arguments
123        self,
124        session_id: int,
125        resource_id: int,
126        send_chunk: Callable[[Chunk], None],
127        end_transfer: Callable[['Transfer'], None],
128        response_timeout_s: float,
129        initial_response_timeout_s: float,
130        max_retries: int,
131        max_lifetime_retries: int,
132        protocol_version: ProtocolVersion,
133        progress_callback: ProgressCallback | None = None,
134        initial_offset: int = 0,
135    ):
136        self.status = Status.OK
137        self.done = threading.Event()
138
139        self._session_id = session_id
140        self._resource_id = resource_id
141        self._offset = initial_offset
142
143        self._send_chunk_fn = send_chunk
144        self._end_transfer = end_transfer
145
146        self._desired_protocol_version = protocol_version
147        self._configured_protocol_version = ProtocolVersion.UNKNOWN
148
149        if self._desired_protocol_version is ProtocolVersion.LEGACY:
150            # In a legacy transfer, there is no protocol negotiation stage.
151            # Automatically configure the context to run the legacy protocol and
152            # proceed to waiting for a chunk.
153            self._configured_protocol_version = ProtocolVersion.LEGACY
154            self._state = Transfer._State.WAITING
155            self._session_id = self._resource_id
156        else:
157            self._state = Transfer._State.INITIATING
158
159        self._last_chunk: Chunk | None = None
160
161        self._retries = 0
162        self._max_retries = max_retries
163        self._lifetime_retries = 0
164        self._max_lifetime_retries = max_lifetime_retries
165        self._response_timer = _Timer(response_timeout_s, self._on_timeout)
166        self._initial_response_timeout_s = initial_response_timeout_s
167
168        self._progress_callback = progress_callback
169
170    async def begin(self) -> None:
171        """Sends the initial chunk of the transfer."""
172
173        if (
174            self._desired_protocol_version is ProtocolVersion.UNKNOWN
175            or self._desired_protocol_version.value
176            > ProtocolVersion.LATEST.value
177        ):
178            _LOG.error(
179                'Cannot start a transfer with unsupported protocol version %d',
180                self._desired_protocol_version.value,
181            )
182            self.finish(Status.INVALID_ARGUMENT)
183            return
184
185        initial_chunk = Chunk(
186            self._desired_protocol_version,
187            Chunk.Type.START,
188            resource_id=self._resource_id,
189        )
190
191        if self._offset != 0:
192            initial_chunk.initial_offset = self._offset
193
194        if self._desired_protocol_version is ProtocolVersion.VERSION_TWO:
195            initial_chunk.desired_session_id = self._session_id
196
197        # Regardless of the desired protocol version, set any additional fields
198        # on the opening chunk, in case the server only runs legacy.
199        self._set_initial_chunk_fields(initial_chunk)
200
201        self._send_chunk(initial_chunk)
202        self._response_timer.start(self._initial_response_timeout_s)
203
204    @property
205    def id(self) -> int:
206        """Returns the identifier for the active transfer."""
207        return self._session_id
208
209    @property
210    def resource_id(self) -> int:
211        """Returns the identifier of the resource being transferred."""
212        return self._resource_id
213
214    @property
215    @abc.abstractmethod
216    def data(self) -> bytes:
217        """Returns the data read or written in this transfer."""
218
219    @abc.abstractmethod
220    def _set_initial_chunk_fields(self, chunk: Chunk) -> None:
221        """Sets fields for the initial non-handshake chunk of the transfer."""
222
223    def _send_chunk(self, chunk: Chunk) -> None:
224        """Sends a chunk to the server, keeping track of the last chunk sent."""
225        self._send_chunk_fn(chunk)
226        self._last_chunk = chunk
227
228    async def handle_chunk(self, chunk: Chunk) -> None:
229        """Processes an incoming chunk from the server.
230
231        Handles terminating chunks (i.e. those with a status) and forwards
232        non-terminating chunks to handle_data_chunk.
233        """
234        self._response_timer.stop()
235        self._retries = 0  # Received data from service, so reset the retries.
236
237        _LOG.debug('Received chunk\n%s', str(chunk.to_message()).rstrip())
238
239        # Status chunks are only used to terminate a transfer. They do not
240        # contain any data that requires processing.
241        if chunk.status is not None:
242            if self._configured_protocol_version is ProtocolVersion.VERSION_TWO:
243                self._send_chunk(
244                    Chunk(
245                        self._configured_protocol_version,
246                        Chunk.Type.COMPLETION_ACK,
247                        session_id=self._session_id,
248                    )
249                )
250
251            self.finish(Status(chunk.status))
252            return
253
254        if self._state is Transfer._State.INITIATING:
255            await self._perform_initial_handshake(chunk)
256        elif self._state is Transfer._State.TERMINATING:
257            if chunk.type is Chunk.Type.COMPLETION_ACK:
258                self.finish(self.status)
259            else:
260                # Expecting a completion ACK but didn't receive one. Go through
261                # the retry process.
262                self._on_timeout()
263        # Only ignoring START_ACK, tests were unhappy with other non-data chunks
264        elif chunk.type not in [Chunk.Type.START_ACK]:
265            await self._handle_data_chunk(chunk)
266        else:
267            _LOG.warning("Ignoring extra START_ACK chunk")
268            return
269
270        # Start the timeout for the server to send a chunk in response.
271        self._response_timer.start()
272
273    async def _perform_initial_handshake(self, chunk: Chunk) -> None:
274        """Progresses the initial handshake phase of a v2+ transfer."""
275        assert self._state is Transfer._State.INITIATING
276
277        # If a non-handshake chunk is received during an INITIATING state, the
278        # transfer server is running a legacy protocol version, which does not
279        # perform a handshake. End the handshake, revert to the legacy protocol,
280        # and process the chunk appropriately.
281        if chunk.type is not Chunk.Type.START_ACK:
282            _LOG.debug(
283                'Transfer %d got non-handshake chunk, reverting to legacy',
284                self.id,
285            )
286
287            if self._offset != 0:
288                _LOG.error(
289                    'Non-zero offset transfers not supported by legacy protocol'
290                )
291                self.finish(Status.INTERNAL)
292                return
293
294            self._configured_protocol_version = ProtocolVersion.LEGACY
295            self._state = Transfer._State.WAITING
296
297            # Update the transfer's session ID, which will map to the
298            # transfer_id of the legacy chunk.
299            self._session_id = chunk.session_id
300
301            await self._handle_data_chunk(chunk)
302            return
303
304        self._configured_protocol_version = ProtocolVersion(
305            min(
306                self._desired_protocol_version.value,
307                chunk.protocol_version.value,
308            )
309        )
310        _LOG.debug(
311            'Transfer %d negotiating protocol version: ours=%d, theirs=%d',
312            self.id,
313            self._desired_protocol_version.value,
314            chunk.protocol_version.value,
315        )
316
317        if self._offset != chunk.initial_offset:
318            # If our offsets don't match, let user handle it
319            self.finish(Status.UNIMPLEMENTED)
320            return
321
322        # Send a confirmation chunk to the server accepting the assigned session
323        # ID and protocol version. Tag any initial transfer parameters onto the
324        # chunk to begin the data transfer.
325        start_ack_confirmation = Chunk(
326            self._configured_protocol_version,
327            Chunk.Type.START_ACK_CONFIRMATION,
328            session_id=self._session_id,
329            offset=self._offset,
330        )
331
332        self._set_initial_chunk_fields(start_ack_confirmation)
333
334        self._state = Transfer._State.WAITING
335        self._send_chunk(start_ack_confirmation)
336
337    @abc.abstractmethod
338    async def _handle_data_chunk(self, chunk: Chunk) -> None:
339        """Handles a chunk that contains or requests data."""
340
341    @abc.abstractmethod
342    def _retry_after_data_timeout(self) -> None:
343        """Retries after a timeout occurs during the data transfer phase.
344
345        Only invoked when in the data transfer phase (i.e. state is in
346        {WAITING, TRANSMITTING, RECOVERY}). Timeouts occurring during an
347        opening or closing handshake are handled by the base Transfer.
348        """
349
350    def _on_timeout(self) -> None:
351        """Handles a timeout while waiting for a chunk."""
352        if self._state is Transfer._State.COMPLETE:
353            return
354
355        self._retries += 1
356        self._lifetime_retries += 1
357
358        if (
359            self._retries > self._max_retries
360            or self._lifetime_retries > self._max_lifetime_retries
361        ):
362            if self._state is Transfer._State.TERMINATING:
363                # If the server never responded to the sent completion chunk,
364                # simply end the transfer locally with its original status.
365                self.finish(self.status)
366            else:
367                self.finish(Status.DEADLINE_EXCEEDED)
368            return
369
370        _LOG.debug(
371            'Received no responses for %.3fs; retrying %d/%d',
372            self._response_timer.timeout_s,
373            self._retries,
374            self._max_retries,
375        )
376
377        retry_handshake_chunk = self._state in (
378            Transfer._State.INITIATING,
379            Transfer._State.TERMINATING,
380        ) or (
381            self._last_chunk is not None
382            and self._last_chunk.type is Chunk.Type.START_ACK_CONFIRMATION
383        )
384
385        if retry_handshake_chunk:
386            assert self._last_chunk is not None
387            self._send_chunk(self._last_chunk)
388        else:
389            self._retry_after_data_timeout()
390
391        if (
392            self._last_chunk is not None
393            and self._last_chunk.type is Chunk.Type.START
394        ):
395            self._response_timer.start(self._initial_response_timeout_s)
396        else:
397            self._response_timer.start()
398
399    def finish(self, status: Status, skip_callback: bool = False) -> None:
400        """Ends the transfer with the specified status."""
401        self._response_timer.stop()
402        self.status = status
403
404        if status.ok():
405            total_size = len(self.data)
406            self._update_progress(total_size, total_size, total_size)
407
408        if not skip_callback:
409            self._end_transfer(self)
410
411        # Set done last so that the transfer has been fully cleaned up.
412        self._state = Transfer._State.COMPLETE
413        self.done.set()
414
415    def _update_progress(
416        self,
417        bytes_sent: int,
418        bytes_confirmed_received: int,
419        total_size_bytes: int | None,
420    ) -> None:
421        """Invokes the provided progress callback, if any, with the progress."""
422
423        stats = ProgressStats(
424            bytes_sent, bytes_confirmed_received, total_size_bytes
425        )
426        _LOG.debug('Transfer %d progress: %s', self.id, stats)
427
428        if self._progress_callback:
429            self._progress_callback(stats)
430
431    def _send_final_chunk(self, status: Status) -> None:
432        """Sends a status chunk to the server and finishes the transfer."""
433        self._send_chunk(
434            Chunk(
435                self._configured_protocol_version,
436                Chunk.Type.COMPLETION,
437                session_id=self.id,
438                status=status,
439            )
440        )
441
442        if self._configured_protocol_version is ProtocolVersion.VERSION_TWO:
443            # Wait for a completion ACK from the server.
444            self.status = status
445            self._state = Transfer._State.TERMINATING
446            self._response_timer.start()
447        else:
448            self.finish(status)
449
450
451class WriteTransfer(Transfer):
452    """A client -> server write transfer."""
453
454    def __init__(  # pylint: disable=too-many-arguments
455        self,
456        session_id: int,
457        resource_id: int,
458        data: bytes,
459        send_chunk: Callable[[Chunk], None],
460        end_transfer: Callable[[Transfer], None],
461        response_timeout_s: float,
462        initial_response_timeout_s: float,
463        max_retries: int,
464        max_lifetime_retries: int,
465        protocol_version: ProtocolVersion,
466        progress_callback: ProgressCallback | None = None,
467        initial_offset: int = 0,
468    ):
469        super().__init__(
470            session_id,
471            resource_id,
472            send_chunk,
473            end_transfer,
474            response_timeout_s,
475            initial_response_timeout_s,
476            max_retries,
477            max_lifetime_retries,
478            protocol_version,
479            progress_callback,
480            initial_offset=initial_offset,
481        )
482        self._data = data
483        self.initial_offset = initial_offset
484
485        self._window_end_offset = 0
486        self._max_chunk_size = 0
487        self._chunk_delay_us: int | None = None
488
489        # The window ID increments for each parameters update.
490        self._window_id = 0
491
492        self._bytes_confirmed_received = 0
493
494    @property
495    def data(self) -> bytes:
496        return self._data
497
498    def _set_initial_chunk_fields(self, chunk: Chunk) -> None:
499        # Nothing to tag onto the initial chunk in a write transfer.
500        pass
501
502    async def _handle_data_chunk(self, chunk: Chunk) -> None:
503        """Processes an incoming chunk from the server.
504
505        In a write transfer, the server only sends transfer parameter updates
506        to the client. When a message is received, update local parameters and
507        send data accordingly.
508        """
509
510        if self._state is Transfer._State.TRANSMITTING:
511            self._state = Transfer._State.WAITING
512
513        assert self._state is Transfer._State.WAITING
514
515        if not self._handle_parameters_update(chunk):
516            return
517
518        self._bytes_confirmed_received = chunk.offset
519        self._state = Transfer._State.TRANSMITTING
520
521        self._window_id += 1
522        asyncio.create_task(self._transmit_next_chunk(self._window_id))
523
524    async def _transmit_next_chunk(
525        self, window_id: int, timeout_us: int | None = None
526    ) -> None:
527        """Transmits a single data chunk to the server.
528
529        If the chunk completes the active window, returns to a WAITING state.
530        Otherwise, schedules another transmission for the next chunk.
531        """
532        if timeout_us is not None:
533            await asyncio.sleep(timeout_us / 1e6)
534
535        if self._state is not Transfer._State.TRANSMITTING:
536            return
537
538        if window_id != self._window_id:
539            _LOG.debug('Transfer %d: Skipping stale window', self.id)
540            return
541
542        chunk = self._next_chunk()
543        self._offset += len(chunk.data)
544
545        sent_requested_bytes = self._offset == self._window_end_offset
546
547        self._send_chunk(chunk)
548        self._update_progress(
549            self._offset,
550            self._bytes_confirmed_received,
551            len(self.data) + self.initial_offset,
552        )
553
554        if sent_requested_bytes:
555            self._state = Transfer._State.WAITING
556        else:
557            asyncio.create_task(
558                self._transmit_next_chunk(
559                    window_id, timeout_us=self._chunk_delay_us
560                )
561            )
562
563    def _handle_parameters_update(self, chunk: Chunk) -> bool:
564        """Updates transfer state based on a transfer parameters update."""
565
566        if chunk.offset > len(self.data) + self.initial_offset:
567            # Bad offset; terminate the transfer.
568            _LOG.error(
569                'Transfer %d: server requested invalid offset %d (size %d)',
570                self.id,
571                chunk.offset,
572                len(self.data) + self.initial_offset,
573            )
574
575            self._send_final_chunk(Status.OUT_OF_RANGE)
576            return False
577
578        if chunk.offset == chunk.window_end_offset:
579            _LOG.error(
580                'Transfer %d: service requested 0 bytes (invalid); aborting',
581                self.id,
582            )
583            self._send_final_chunk(Status.INTERNAL)
584            return False
585
586        # Extend the window to the new end offset specified by the server.
587        self._window_end_offset = min(
588            chunk.window_end_offset, len(self.data) + self.initial_offset
589        )
590
591        if chunk.requests_transmission_from_offset():
592            # Check whether the client has sent a previous data offset, which
593            # indicates that some chunks were lost in transmission.
594            if chunk.offset < self._offset:
595                _LOG.debug(
596                    'Write transfer %d rolling back: offset %d from %d',
597                    self.id,
598                    chunk.offset,
599                    self._offset,
600                )
601
602            self._offset = chunk.offset
603
604        if chunk.max_chunk_size_bytes is not None:
605            self._max_chunk_size = chunk.max_chunk_size_bytes
606
607        if chunk.min_delay_microseconds is not None:
608            self._chunk_delay_us = chunk.min_delay_microseconds
609
610        return True
611
612    def _retry_after_data_timeout(self) -> None:
613        if (
614            self._state is Transfer._State.WAITING
615            and self._last_chunk is not None
616        ):
617            self._send_chunk(self._last_chunk)
618
619    def _next_chunk(self) -> Chunk:
620        """Returns the next Chunk message to send in the data transfer."""
621        chunk = Chunk(
622            self._configured_protocol_version,
623            Chunk.Type.DATA,
624            session_id=self.id,
625            offset=self._offset,
626        )
627
628        max_bytes_in_chunk = min(
629            self._max_chunk_size, self._window_end_offset - self._offset
630        )
631        chunk.data = self.data[
632            self._offset
633            - self.initial_offset : self._offset
634            - self.initial_offset
635            + max_bytes_in_chunk
636        ]
637
638        # Mark the final chunk of the transfer.
639        if (
640            len(self.data) - self._offset + self.initial_offset
641            <= max_bytes_in_chunk
642        ):
643            chunk.remaining_bytes = 0
644
645        return chunk
646
647
648class ReadTransfer(Transfer):
649    """A client <- server read transfer.
650
651    Although Python can effectively handle an unlimited transfer window, this
652    client sets a conservative window and chunk size to avoid overloading the
653    device. These are configurable in the constructor.
654    """
655
656    # pylint: disable=too-many-instance-attributes
657
658    # The fractional position within a window at which a receive transfer should
659    # extend its window size to minimize the amount of time the transmitter
660    # spends blocked.
661    #
662    # For example, a divisor of 2 will extend the window when half of the
663    # requested data has been received, a divisor of three will extend at a
664    # third of the window, and so on.
665    EXTEND_WINDOW_DIVISOR = 2
666
667    # Slow start and congestion avoidance are analogues to the equally named
668    # phases in TCP congestion control.
669    class _TransmitPhase(enum.Enum):
670        SLOW_START = 0
671        CONGESTION_AVOIDANCE = 1
672
673    # The type of data transmission the transfer is requesting.
674    class _TransmitAction(enum.Enum):
675        # Immediate parameters sent at the start of a new transfer for legacy
676        # compatibility.
677        BEGIN = 0
678
679        # Initial parameters chunk following the opening handshake.
680        FIRST_PARAMETERS = 1
681
682        # Extend the current transmission window.
683        EXTEND = 2
684
685        # Rewind the transfer to a certain offset following data loss.
686        RETRANSMIT = 3
687
688    def __init__(  # pylint: disable=too-many-arguments
689        self,
690        session_id: int,
691        resource_id: int,
692        send_chunk: Callable[[Chunk], None],
693        end_transfer: Callable[[Transfer], None],
694        response_timeout_s: float,
695        initial_response_timeout_s: float,
696        max_retries: int,
697        max_lifetime_retries: int,
698        protocol_version: ProtocolVersion,
699        max_window_size_bytes: int = 32768,
700        max_chunk_size: int = 1024,
701        chunk_delay_us: int | None = None,
702        progress_callback: ProgressCallback | None = None,
703        initial_offset: int = 0,
704    ):
705        super().__init__(
706            session_id,
707            resource_id,
708            send_chunk,
709            end_transfer,
710            response_timeout_s,
711            initial_response_timeout_s,
712            max_retries,
713            max_lifetime_retries,
714            protocol_version,
715            progress_callback,
716            initial_offset=initial_offset,
717        )
718        self._max_window_size_bytes = max_window_size_bytes
719        self._max_chunk_size = max_chunk_size
720        self._chunk_delay_us = chunk_delay_us
721
722        self._remaining_transfer_size: int | None = None
723        self._data = bytearray()
724        self._window_end_offset = max_chunk_size
725        self._window_size_multiplier = 1
726        self._window_size = self._max_chunk_size * self._window_size_multiplier
727        self._transmit_phase = ReadTransfer._TransmitPhase.SLOW_START
728        self._last_chunk_offset: int | None = None
729
730    @property
731    def data(self) -> bytes:
732        """Returns an immutable copy of the data that has been read."""
733        return bytes(self._data)
734
735    def _set_initial_chunk_fields(self, chunk: Chunk) -> None:
736        self._update_and_set_transfer_parameters(
737            chunk, ReadTransfer._TransmitAction.BEGIN
738        )
739
740    async def _handle_data_chunk(self, chunk: Chunk) -> None:
741        """Processes an incoming chunk from the server.
742
743        In a read transfer, the client receives data chunks from the server.
744        Once all pending data is received, the transfer parameters are updated.
745        """
746
747        if self._state is Transfer._State.RECOVERY:
748            if chunk.offset != self._offset:
749                if self._last_chunk_offset == chunk.offset:
750                    _LOG.debug(
751                        'Transfer %d received repeated offset %d: '
752                        'retry detected, resending transfer parameters',
753                        self.id,
754                        chunk.offset,
755                    )
756                    self._send_chunk(
757                        self._transfer_parameters(
758                            ReadTransfer._TransmitAction.RETRANSMIT
759                        )
760                    )
761                else:
762                    _LOG.debug(
763                        'Transfer %d waiting for offset %d, ignoring %d',
764                        self.id,
765                        self._offset,
766                        chunk.offset,
767                    )
768                    self._last_chunk_offset = chunk.offset
769                return
770
771            _LOG.info(
772                'Transfer %d received expected offset %d, resuming transfer',
773                self.id,
774                chunk.offset,
775            )
776            self._state = Transfer._State.WAITING
777
778        assert self._state is Transfer._State.WAITING
779
780        if chunk.offset != self._offset:
781            if chunk.offset + len(chunk.data) <= self._offset:
782                # If the chunk's data has already been received, don't go
783                # through a full recovery cycle to avoid shrinking the window
784                # size and potentially thrashing. The expected data may already
785                # be in-flight, so just allow the transmitter to keep going with
786                # a CONTINUE parameters chunk.
787                _LOG.debug(
788                    'Transfer %d received duplicate chunk with offset %d',
789                    self.id,
790                    chunk.offset,
791                )
792                self._send_chunk(
793                    self._transfer_parameters(
794                        ReadTransfer._TransmitAction.EXTEND,
795                        update=False,
796                    )
797                )
798            else:
799                # Initially, the transfer service only supports in-order
800                # transfers. If data is received out of order, request that the
801                # server retransmit from the previous offset.
802                _LOG.debug(
803                    'Transfer %d expected offset %d, received %d: '
804                    'entering recovery state',
805                    self.id,
806                    self._offset,
807                    chunk.offset,
808                )
809                self._state = Transfer._State.RECOVERY
810
811                self._send_chunk(
812                    self._transfer_parameters(
813                        ReadTransfer._TransmitAction.RETRANSMIT
814                    )
815                )
816            return
817
818        self._data += chunk.data
819        self._offset += len(chunk.data)
820
821        # Update the last offset seen so that retries can be detected.
822        self._last_chunk_offset = chunk.offset
823
824        if chunk.remaining_bytes is not None:
825            if chunk.remaining_bytes == 0:
826                # No more data to read. Acknowledge receipt and finish.
827                self._send_final_chunk(Status.OK)
828                return
829
830            # The server may indicate if the amount of remaining data is known.
831            self._remaining_transfer_size = chunk.remaining_bytes
832        elif self._remaining_transfer_size is not None:
833            # Update the remaining transfer size, if it is known.
834            self._remaining_transfer_size -= len(chunk.data)
835
836            # If the transfer size drops to zero, the estimate was inaccurate.
837            if self._remaining_transfer_size <= 0:
838                self._remaining_transfer_size = None
839
840        total_size = (
841            None
842            if self._remaining_transfer_size is None
843            else (self._remaining_transfer_size + self._offset)
844        )
845        self._update_progress(self._offset, self._offset, total_size)
846
847        if chunk.window_end_offset != 0:
848            if chunk.window_end_offset < self._offset:
849                _LOG.error(
850                    'Transfer %d: transmitter sent invalid earlier end offset '
851                    '%d (receiver offset %d)',
852                    self.id,
853                    chunk.window_end_offset,
854                    self._offset,
855                )
856                self._send_final_chunk(Status.INTERNAL)
857                return
858
859            if chunk.window_end_offset > self._window_end_offset:
860                _LOG.error(
861                    'Transfer %d: transmitter sent invalid later end offset '
862                    '%d (receiver end offset %d)',
863                    self.id,
864                    chunk.window_end_offset,
865                    self._window_end_offset,
866                )
867                self._send_final_chunk(Status.INTERNAL)
868                return
869
870            self._window_end_offset = chunk.window_end_offset
871
872        if self._offset == self._window_end_offset:
873            # All pending data was received. Send out a new parameters chunk for
874            # the next block.
875            self._send_chunk(
876                self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND)
877            )
878            return
879
880        remaining_window_size = self._window_end_offset - self._offset
881        extend_window = (
882            remaining_window_size
883            <= self._window_size / ReadTransfer.EXTEND_WINDOW_DIVISOR
884        )
885
886        if extend_window:
887            self._send_chunk(
888                self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND)
889            )
890
891    def _retry_after_data_timeout(self) -> None:
892        if (
893            self._state is Transfer._State.WAITING
894            or self._state is Transfer._State.RECOVERY
895        ):
896            self._send_chunk(
897                self._transfer_parameters(
898                    ReadTransfer._TransmitAction.RETRANSMIT
899                )
900            )
901
902    def _set_transfer_parameters(
903        self,
904        chunk: Chunk,
905    ) -> None:
906        chunk.offset = self._offset
907        chunk.window_end_offset = self._window_end_offset
908        chunk.max_chunk_size_bytes = self._max_chunk_size
909
910        if self._chunk_delay_us:
911            chunk.min_delay_microseconds = self._chunk_delay_us
912
913    def _update_and_set_transfer_parameters(
914        self, chunk: Chunk, action: 'ReadTransfer._TransmitAction'
915    ) -> None:
916        if action is ReadTransfer._TransmitAction.EXTEND:
917            # Window was received successfully without packet loss and should
918            # grow. Double the window size during slow start, or increase it by
919            # a single chunk in congestion avoidance.
920            if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START:
921                self._window_size_multiplier *= 2
922            else:
923                self._window_size_multiplier += 1
924
925            # The window size can never exceed the user-specified maximum bytes.
926            # If it does, reduce the multiplier to the largest size that fits.
927            if (
928                self._window_size_multiplier * self._max_chunk_size
929                > self._max_window_size_bytes
930            ):
931                self._window_size_multiplier = (
932                    self._max_window_size_bytes // self._max_chunk_size
933                )
934
935        elif action is ReadTransfer._TransmitAction.RETRANSMIT:
936            # A packet was lost: shrink the window size. Additionally, after the
937            # first packet loss, transition from the slow start to the
938            # congestion avoidance phase of the transfer.
939            if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START:
940                self._transmit_phase = (
941                    ReadTransfer._TransmitPhase.CONGESTION_AVOIDANCE
942                )
943            self._window_size_multiplier = max(
944                self._window_size_multiplier // 2, 1
945            )
946
947        self._window_size = min(
948            self._max_chunk_size * self._window_size_multiplier,
949            self._max_window_size_bytes,
950        )
951
952        self._window_end_offset = self._offset + self._window_size
953        self._set_transfer_parameters(chunk)
954
955    def _transfer_parameters(
956        self,
957        action: 'ReadTransfer._TransmitAction',
958        update: bool = True,
959    ) -> Chunk:
960        """Returns an updated transfer parameters chunk."""
961
962        if action is ReadTransfer._TransmitAction.BEGIN:
963            chunk_type = Chunk.Type.START
964        elif action is ReadTransfer._TransmitAction.EXTEND:
965            chunk_type = Chunk.Type.PARAMETERS_CONTINUE
966        else:
967            chunk_type = Chunk.Type.PARAMETERS_RETRANSMIT
968
969        chunk = Chunk(
970            self._configured_protocol_version, chunk_type, session_id=self.id
971        )
972
973        if update:
974            self._update_and_set_transfer_parameters(chunk, action)
975        else:
976            self._set_transfer_parameters(chunk)
977
978        return chunk
979