xref: /aosp_15_r20/external/pigweed/pw_transfer/py/tests/transfer_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests for the transfer service client."""
16
17import enum
18import math
19import os
20import unittest
21from typing import Iterable
22
23from pw_status import Status
24from pw_rpc import callback_client, client, ids, packets
25from pw_rpc.internal import packet_pb2
26
27import pw_transfer
28from pw_transfer import ProtocolVersion
29
30from pw_transfer import transfer_pb2
31
32_TRANSFER_SERVICE_ID = ids.calculate('pw.transfer.Transfer')
33_FIRST_SESSION_ID = 1
34_ARBITRARY_TRANSFER_ID = 66
35
36# If the default timeout is too short, some tests become flaky on Windows.
37DEFAULT_TIMEOUT_S = 0.3
38
39
40class _Method(enum.Enum):
41    READ = ids.calculate('Read')
42    WRITE = ids.calculate('Write')
43
44
45# pylint: disable=missing-function-docstring, missing-class-docstring
46
47
48class TransferManagerTest(unittest.TestCase):
49    # pylint: disable=too-many-public-methods
50    """Tests for the transfer manager."""
51
52    def setUp(self) -> None:
53        self._client = client.Client.from_modules(
54            callback_client.Impl(),
55            [client.Channel(1, self._handle_request)],
56            (transfer_pb2,),
57        )
58        self._service = self._client.channel(1).rpcs.pw.transfer.Transfer
59
60        self._sent_chunks: list[transfer_pb2.Chunk] = []
61        self._packets_to_send: list[list[packet_pb2.RpcPacket]] = []
62
63    def _enqueue_server_responses(
64        self, method: _Method, responses: Iterable[Iterable[transfer_pb2.Chunk]]
65    ) -> None:
66        for group in responses:
67            serialized_group = []
68            for response in group:
69                serialized_group.append(
70                    packet_pb2.RpcPacket(
71                        type=packet_pb2.PacketType.SERVER_STREAM,
72                        channel_id=1,
73                        service_id=_TRANSFER_SERVICE_ID,
74                        method_id=method.value,
75                        status=Status.OK.value,
76                        payload=response.SerializeToString(),
77                    )
78                )
79            self._packets_to_send.append(serialized_group)
80
81    def _enqueue_server_error(self, method: _Method, error: Status) -> None:
82        self._packets_to_send.append(
83            [
84                packet_pb2.RpcPacket(
85                    type=packet_pb2.PacketType.SERVER_ERROR,
86                    channel_id=1,
87                    service_id=_TRANSFER_SERVICE_ID,
88                    method_id=method.value,
89                    status=error.value,
90                )
91            ]
92        )
93
94    def _handle_request(self, data: bytes) -> None:
95        packet = packets.decode(data)
96        if packet.type is not packet_pb2.PacketType.CLIENT_STREAM:
97            return
98
99        chunk = transfer_pb2.Chunk()
100        chunk.MergeFromString(packet.payload)
101        self._sent_chunks.append(chunk)
102
103        if self._packets_to_send:
104            responses = self._packets_to_send.pop(0)
105            for response in responses:
106                response.call_id = packet.call_id
107                self._client.process_packet(response.SerializeToString())
108
109    def _received_data(self) -> bytearray:
110        data = bytearray()
111        for chunk in self._sent_chunks:
112            data.extend(chunk.data)
113        return data
114
115    def test_read_transfer_basic(self):
116        manager = pw_transfer.Manager(
117            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
118        )
119
120        self._enqueue_server_responses(
121            _Method.READ,
122            (
123                (
124                    transfer_pb2.Chunk(
125                        transfer_id=3, offset=0, data=b'abc', remaining_bytes=0
126                    ),
127                ),
128            ),
129        )
130
131        data = manager.read(3)
132        self.assertEqual(data, b'abc')
133        self.assertEqual(len(self._sent_chunks), 2)
134        self.assertTrue(self._sent_chunks[-1].HasField('status'))
135        self.assertEqual(self._sent_chunks[-1].status, 0)
136
137    def test_read_transfer_multichunk(self) -> None:
138        manager = pw_transfer.Manager(
139            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
140        )
141
142        self._enqueue_server_responses(
143            _Method.READ,
144            (
145                (
146                    transfer_pb2.Chunk(
147                        transfer_id=3, offset=0, data=b'abc', remaining_bytes=3
148                    ),
149                    transfer_pb2.Chunk(
150                        transfer_id=3, offset=3, data=b'def', remaining_bytes=0
151                    ),
152                ),
153            ),
154        )
155
156        data = manager.read(3)
157        self.assertEqual(data, b'abcdef')
158        self.assertEqual(len(self._sent_chunks), 2)
159        self.assertTrue(self._sent_chunks[-1].HasField('status'))
160        self.assertEqual(self._sent_chunks[-1].status, 0)
161
162    def test_read_transfer_progress_callback(self) -> None:
163        manager = pw_transfer.Manager(
164            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
165        )
166
167        self._enqueue_server_responses(
168            _Method.READ,
169            (
170                (
171                    transfer_pb2.Chunk(
172                        transfer_id=3, offset=0, data=b'abc', remaining_bytes=3
173                    ),
174                    transfer_pb2.Chunk(
175                        transfer_id=3, offset=3, data=b'def', remaining_bytes=0
176                    ),
177                ),
178            ),
179        )
180
181        progress: list[pw_transfer.ProgressStats] = []
182
183        data = manager.read(3, progress.append)
184        self.assertEqual(data, b'abcdef')
185        self.assertEqual(len(self._sent_chunks), 2)
186        self.assertTrue(self._sent_chunks[-1].HasField('status'))
187        self.assertEqual(self._sent_chunks[-1].status, 0)
188        self.assertEqual(
189            progress,
190            [
191                pw_transfer.ProgressStats(3, 3, 6),
192                pw_transfer.ProgressStats(6, 6, 6),
193            ],
194        )
195
196    def test_read_transfer_retry_bad_offset(self) -> None:
197        """Server responds with an unexpected offset in a read transfer."""
198        manager = pw_transfer.Manager(
199            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
200        )
201
202        self._enqueue_server_responses(
203            _Method.READ,
204            (
205                (
206                    transfer_pb2.Chunk(
207                        transfer_id=3, offset=0, data=b'123', remaining_bytes=6
208                    ),
209                    # Incorrect offset; expecting 3.
210                    transfer_pb2.Chunk(
211                        transfer_id=3, offset=1, data=b'456', remaining_bytes=3
212                    ),
213                ),
214                (
215                    transfer_pb2.Chunk(
216                        transfer_id=3, offset=3, data=b'456', remaining_bytes=3
217                    ),
218                    transfer_pb2.Chunk(
219                        transfer_id=3, offset=6, data=b'789', remaining_bytes=0
220                    ),
221                ),
222            ),
223        )
224
225        data = manager.read(3)
226        self.assertEqual(data, b'123456789')
227
228        # Two transfer parameter requests should have been sent.
229        self.assertEqual(len(self._sent_chunks), 3)
230        self.assertTrue(self._sent_chunks[-1].HasField('status'))
231        self.assertEqual(self._sent_chunks[-1].status, 0)
232
233    def test_read_transfer_recovery_sends_parameters_on_retry(self) -> None:
234        """Server sends the same chunk twice (retry) in a read transfer."""
235        manager = pw_transfer.Manager(
236            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
237        )
238
239        self._enqueue_server_responses(
240            _Method.READ,
241            (
242                (
243                    # Bad offset, enter recovery state. Only one parameters
244                    # chunk should be sent.
245                    transfer_pb2.Chunk(
246                        transfer_id=3, offset=1, data=b'234', remaining_bytes=5
247                    ),
248                    transfer_pb2.Chunk(
249                        transfer_id=3, offset=4, data=b'567', remaining_bytes=2
250                    ),
251                    transfer_pb2.Chunk(
252                        transfer_id=3, offset=7, data=b'8', remaining_bytes=1
253                    ),
254                ),
255                (
256                    # Only one parameters chunk should be sent after the server
257                    # retries the same offset twice.
258                    transfer_pb2.Chunk(
259                        transfer_id=3, offset=1, data=b'234', remaining_bytes=5
260                    ),
261                    transfer_pb2.Chunk(
262                        transfer_id=3, offset=4, data=b'567', remaining_bytes=2
263                    ),
264                    transfer_pb2.Chunk(
265                        transfer_id=3, offset=7, data=b'8', remaining_bytes=1
266                    ),
267                    transfer_pb2.Chunk(
268                        transfer_id=3, offset=7, data=b'8', remaining_bytes=1
269                    ),
270                ),
271                (
272                    transfer_pb2.Chunk(
273                        transfer_id=3,
274                        offset=0,
275                        data=b'123456789',
276                        remaining_bytes=0,
277                    ),
278                ),
279            ),
280        )
281
282        data = manager.read(3)
283        self.assertEqual(data, b'123456789')
284
285        self.assertEqual(len(self._sent_chunks), 4)
286        self.assertEqual(
287            self._sent_chunks[0].type, transfer_pb2.Chunk.Type.START
288        )
289        self.assertEqual(self._sent_chunks[0].offset, 0)
290        self.assertEqual(
291            self._sent_chunks[1].type,
292            transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
293        )
294        self.assertEqual(self._sent_chunks[1].offset, 0)
295        self.assertEqual(
296            self._sent_chunks[2].type,
297            transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
298        )
299        self.assertEqual(self._sent_chunks[2].offset, 0)
300        self.assertEqual(
301            self._sent_chunks[3].type, transfer_pb2.Chunk.Type.COMPLETION
302        )
303
304    def test_read_transfer_retry_timeout(self) -> None:
305        """Server doesn't respond to read transfer parameters."""
306        manager = pw_transfer.Manager(
307            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
308        )
309
310        self._enqueue_server_responses(
311            _Method.READ,
312            (
313                (),  # Send nothing in response to the initial parameters.
314                (
315                    transfer_pb2.Chunk(
316                        transfer_id=3, offset=0, data=b'xyz', remaining_bytes=0
317                    ),
318                ),
319            ),
320        )
321
322        data = manager.read(3)
323        self.assertEqual(data, b'xyz')
324
325        # Two transfer parameter requests should have been sent.
326        self.assertEqual(len(self._sent_chunks), 3)
327        self.assertTrue(self._sent_chunks[-1].HasField('status'))
328        self.assertEqual(self._sent_chunks[-1].status, 0)
329
330    def test_read_transfer_lifetime_retries(self) -> None:
331        """Server doesn't respond several times during the transfer."""
332        manager = pw_transfer.Manager(
333            self._service,
334            default_response_timeout_s=DEFAULT_TIMEOUT_S,
335            max_retries=2**32 - 1,
336            max_lifetime_retries=4,
337        )
338
339        self._enqueue_server_responses(
340            _Method.READ,
341            (
342                (),  # Retry 1
343                (),  # Retry 2
344                (
345                    transfer_pb2.Chunk(  # Expected chunk.
346                        transfer_id=43, offset=0, data=b'xyz'
347                    ),
348                ),
349                # Don't send anything else. The maximum lifetime retry count
350                # should be hit.
351            ),
352        )
353
354        with self.assertRaises(pw_transfer.Error) as context:
355            manager.read(43)
356
357        self.assertEqual(len(self._sent_chunks), 5)
358
359        exception = context.exception
360        self.assertEqual(exception.resource_id, 43)
361        self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED)
362
363    def test_read_transfer_timeout(self) -> None:
364        manager = pw_transfer.Manager(
365            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
366        )
367
368        with self.assertRaises(pw_transfer.Error) as context:
369            manager.read(27)
370
371        exception = context.exception
372        self.assertEqual(exception.resource_id, 27)
373        self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED)
374
375        # The client should have sent four transfer parameters requests: one
376        # initial, and three retries.
377        self.assertEqual(len(self._sent_chunks), 4)
378
379    def test_read_transfer_error(self) -> None:
380        manager = pw_transfer.Manager(
381            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
382        )
383
384        self._enqueue_server_responses(
385            _Method.READ,
386            (
387                (
388                    transfer_pb2.Chunk(
389                        transfer_id=31, status=Status.NOT_FOUND.value
390                    ),
391                ),
392            ),
393        )
394
395        with self.assertRaises(pw_transfer.Error) as context:
396            manager.read(31)
397
398        exception = context.exception
399        self.assertEqual(exception.resource_id, 31)
400        self.assertEqual(exception.status, Status.NOT_FOUND)
401
402    def test_read_transfer_server_error(self) -> None:
403        manager = pw_transfer.Manager(
404            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
405        )
406
407        self._enqueue_server_error(_Method.READ, Status.NOT_FOUND)
408
409        with self.assertRaises(pw_transfer.Error) as context:
410            manager.read(31)
411
412        exception = context.exception
413        self.assertEqual(exception.resource_id, 31)
414        self.assertEqual(exception.status, Status.INTERNAL)
415
416    def test_read_transfer_adaptive_window_slow_start(self) -> None:
417        test_max_chunk_size = 16
418
419        manager = pw_transfer.Manager(
420            self._service,
421            default_response_timeout_s=DEFAULT_TIMEOUT_S,
422            max_chunk_size_bytes=test_max_chunk_size,
423            default_protocol_version=ProtocolVersion.LEGACY,
424        )
425
426        self._enqueue_server_responses(
427            _Method.READ,
428            (
429                # First window: 1 chunk.
430                (
431                    transfer_pb2.Chunk(
432                        transfer_id=_ARBITRARY_TRANSFER_ID,
433                        type=transfer_pb2.Chunk.Type.DATA,
434                        offset=0,
435                        data=b'#' * test_max_chunk_size,
436                    ),
437                ),
438                # Second window: 2 chunks.
439                (
440                    transfer_pb2.Chunk(
441                        transfer_id=_ARBITRARY_TRANSFER_ID,
442                        type=transfer_pb2.Chunk.Type.DATA,
443                        offset=test_max_chunk_size,
444                        data=b'#' * test_max_chunk_size,
445                    ),
446                    transfer_pb2.Chunk(
447                        transfer_id=_ARBITRARY_TRANSFER_ID,
448                        type=transfer_pb2.Chunk.Type.DATA,
449                        offset=2 * test_max_chunk_size,
450                        data=b'#' * test_max_chunk_size,
451                    ),
452                ),
453                # Third window: finish transfer.
454                (
455                    transfer_pb2.Chunk(
456                        transfer_id=_ARBITRARY_TRANSFER_ID,
457                        type=transfer_pb2.Chunk.Type.DATA,
458                        offset=3 * test_max_chunk_size,
459                        data=b'#' * test_max_chunk_size,
460                        remaining_bytes=0,
461                    ),
462                ),
463            ),
464        )
465
466        data = manager.read(_ARBITRARY_TRANSFER_ID)
467
468        self.assertEqual(
469            self._sent_chunks,
470            [
471                # First parameters: 1 chunk window.
472                transfer_pb2.Chunk(
473                    type=transfer_pb2.Chunk.Type.START,
474                    transfer_id=_ARBITRARY_TRANSFER_ID,
475                    resource_id=_ARBITRARY_TRANSFER_ID,
476                    pending_bytes=test_max_chunk_size,
477                    max_chunk_size_bytes=test_max_chunk_size,
478                    window_end_offset=test_max_chunk_size,
479                ),
480                # Second parameters: 2 chunk window.
481                transfer_pb2.Chunk(
482                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
483                    transfer_id=_ARBITRARY_TRANSFER_ID,
484                    offset=test_max_chunk_size,
485                    pending_bytes=2 * test_max_chunk_size,
486                    max_chunk_size_bytes=test_max_chunk_size,
487                    window_end_offset=(
488                        test_max_chunk_size + 2 * test_max_chunk_size
489                    ),
490                ),
491                # Third parameters: 4 chunk window.
492                transfer_pb2.Chunk(
493                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
494                    transfer_id=_ARBITRARY_TRANSFER_ID,
495                    offset=2 * test_max_chunk_size,
496                    pending_bytes=4 * test_max_chunk_size,
497                    max_chunk_size_bytes=test_max_chunk_size,
498                    window_end_offset=(
499                        2 * test_max_chunk_size + 4 * test_max_chunk_size
500                    ),
501                ),
502                transfer_pb2.Chunk(
503                    type=transfer_pb2.Chunk.Type.COMPLETION,
504                    transfer_id=_ARBITRARY_TRANSFER_ID,
505                    status=Status.OK.value,
506                ),
507            ],
508        )
509        self.assertEqual(data, b'#' * (4 * test_max_chunk_size))
510
511    def test_read_transfer_adaptive_window_congestion_avoidance(self) -> None:
512        test_max_chunk_size = 16
513
514        manager = pw_transfer.Manager(
515            self._service,
516            default_response_timeout_s=DEFAULT_TIMEOUT_S,
517            max_chunk_size_bytes=test_max_chunk_size,
518            default_protocol_version=ProtocolVersion.LEGACY,
519        )
520
521        self._enqueue_server_responses(
522            _Method.READ,
523            (
524                # First window: 1 chunk.
525                (
526                    transfer_pb2.Chunk(
527                        transfer_id=_ARBITRARY_TRANSFER_ID,
528                        type=transfer_pb2.Chunk.Type.DATA,
529                        offset=0,
530                        data=b'#' * test_max_chunk_size,
531                    ),
532                ),
533                # Second window: 2 chunks.
534                (
535                    transfer_pb2.Chunk(
536                        transfer_id=_ARBITRARY_TRANSFER_ID,
537                        type=transfer_pb2.Chunk.Type.DATA,
538                        offset=test_max_chunk_size,
539                        data=b'#' * test_max_chunk_size,
540                    ),
541                    transfer_pb2.Chunk(
542                        transfer_id=_ARBITRARY_TRANSFER_ID,
543                        type=transfer_pb2.Chunk.Type.DATA,
544                        offset=2 * test_max_chunk_size,
545                        data=b'#' * test_max_chunk_size,
546                    ),
547                ),
548                # Third window: send the wrong offset, triggering a
549                # retransmission.
550                (
551                    transfer_pb2.Chunk(
552                        transfer_id=_ARBITRARY_TRANSFER_ID,
553                        type=transfer_pb2.Chunk.Type.DATA,
554                        offset=5 * test_max_chunk_size,
555                        data=b'#' * test_max_chunk_size,
556                    ),
557                ),
558                # Fourth window: send the expected offset.
559                (
560                    transfer_pb2.Chunk(
561                        transfer_id=_ARBITRARY_TRANSFER_ID,
562                        type=transfer_pb2.Chunk.Type.DATA,
563                        offset=3 * test_max_chunk_size,
564                        data=b'#' * test_max_chunk_size,
565                    ),
566                    transfer_pb2.Chunk(
567                        transfer_id=_ARBITRARY_TRANSFER_ID,
568                        type=transfer_pb2.Chunk.Type.DATA,
569                        offset=4 * test_max_chunk_size,
570                        data=b'#' * test_max_chunk_size,
571                    ),
572                ),
573                # Fifth window: finish the transfer.
574                (
575                    transfer_pb2.Chunk(
576                        transfer_id=_ARBITRARY_TRANSFER_ID,
577                        type=transfer_pb2.Chunk.Type.DATA,
578                        offset=5 * test_max_chunk_size,
579                        data=b'#' * test_max_chunk_size,
580                        remaining_bytes=0,
581                    ),
582                ),
583            ),
584        )
585
586        data = manager.read(_ARBITRARY_TRANSFER_ID)
587
588        self.assertEqual(
589            self._sent_chunks,
590            [
591                # First parameters: 1 chunk window.
592                transfer_pb2.Chunk(
593                    type=transfer_pb2.Chunk.Type.START,
594                    transfer_id=_ARBITRARY_TRANSFER_ID,
595                    resource_id=_ARBITRARY_TRANSFER_ID,
596                    pending_bytes=test_max_chunk_size,
597                    max_chunk_size_bytes=test_max_chunk_size,
598                    window_end_offset=test_max_chunk_size,
599                ),
600                # Second parameters: 2 chunk window.
601                transfer_pb2.Chunk(
602                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
603                    transfer_id=_ARBITRARY_TRANSFER_ID,
604                    offset=test_max_chunk_size,
605                    pending_bytes=2 * test_max_chunk_size,
606                    max_chunk_size_bytes=test_max_chunk_size,
607                    window_end_offset=(
608                        test_max_chunk_size + 2 * test_max_chunk_size
609                    ),
610                ),
611                # Third parameters: 4 chunk window.
612                transfer_pb2.Chunk(
613                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
614                    transfer_id=_ARBITRARY_TRANSFER_ID,
615                    offset=2 * test_max_chunk_size,
616                    pending_bytes=4 * test_max_chunk_size,
617                    max_chunk_size_bytes=test_max_chunk_size,
618                    window_end_offset=(
619                        2 * test_max_chunk_size + 4 * test_max_chunk_size
620                    ),
621                ),
622                # Fourth parameters: data loss, retransmit and halve window.
623                transfer_pb2.Chunk(
624                    type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
625                    transfer_id=_ARBITRARY_TRANSFER_ID,
626                    offset=3 * test_max_chunk_size,
627                    pending_bytes=2 * test_max_chunk_size,
628                    max_chunk_size_bytes=test_max_chunk_size,
629                    window_end_offset=(
630                        3 * test_max_chunk_size + 2 * test_max_chunk_size
631                    ),
632                ),
633                # Fifth parameters: in congestion avoidance, window size now
634                # only increases by one chunk instead of doubling.
635                transfer_pb2.Chunk(
636                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
637                    transfer_id=_ARBITRARY_TRANSFER_ID,
638                    offset=4 * test_max_chunk_size,
639                    pending_bytes=3 * test_max_chunk_size,
640                    max_chunk_size_bytes=test_max_chunk_size,
641                    window_end_offset=(
642                        4 * test_max_chunk_size + 3 * test_max_chunk_size
643                    ),
644                ),
645                transfer_pb2.Chunk(
646                    type=transfer_pb2.Chunk.Type.COMPLETION,
647                    transfer_id=_ARBITRARY_TRANSFER_ID,
648                    status=Status.OK.value,
649                ),
650            ],
651        )
652        self.assertEqual(data, b'#' * (6 * test_max_chunk_size))
653
654    def test_read_transfer_v2_adaptive_window_slow_start(self) -> None:
655        test_max_chunk_size = 16
656
657        manager = pw_transfer.Manager(
658            self._service,
659            default_response_timeout_s=DEFAULT_TIMEOUT_S,
660            max_chunk_size_bytes=test_max_chunk_size,
661            default_protocol_version=ProtocolVersion.VERSION_TWO,
662        )
663
664        self._enqueue_server_responses(
665            _Method.READ,
666            (
667                (
668                    transfer_pb2.Chunk(
669                        session_id=_FIRST_SESSION_ID,
670                        type=transfer_pb2.Chunk.Type.START_ACK,
671                        protocol_version=ProtocolVersion.VERSION_TWO.value,
672                    ),
673                ),
674                # First window: 1 chunk.
675                (
676                    transfer_pb2.Chunk(
677                        session_id=_FIRST_SESSION_ID,
678                        type=transfer_pb2.Chunk.Type.DATA,
679                        offset=0,
680                        data=b'#' * test_max_chunk_size,
681                    ),
682                ),
683                # Second window: 2 chunks.
684                (
685                    transfer_pb2.Chunk(
686                        session_id=_FIRST_SESSION_ID,
687                        type=transfer_pb2.Chunk.Type.DATA,
688                        offset=test_max_chunk_size,
689                        data=b'#' * test_max_chunk_size,
690                    ),
691                    transfer_pb2.Chunk(
692                        session_id=_FIRST_SESSION_ID,
693                        type=transfer_pb2.Chunk.Type.DATA,
694                        offset=2 * test_max_chunk_size,
695                        data=b'#' * test_max_chunk_size,
696                    ),
697                ),
698                # Third window: finish transfer.
699                (
700                    transfer_pb2.Chunk(
701                        session_id=_FIRST_SESSION_ID,
702                        type=transfer_pb2.Chunk.Type.DATA,
703                        offset=3 * test_max_chunk_size,
704                        data=b'#' * test_max_chunk_size,
705                        remaining_bytes=0,
706                    ),
707                ),
708                (
709                    transfer_pb2.Chunk(
710                        session_id=_FIRST_SESSION_ID,
711                        type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
712                    ),
713                ),
714            ),
715        )
716
717        data = manager.read(_ARBITRARY_TRANSFER_ID)
718
719        self.assertEqual(
720            self._sent_chunks,
721            [
722                transfer_pb2.Chunk(
723                    transfer_id=_ARBITRARY_TRANSFER_ID,
724                    resource_id=_ARBITRARY_TRANSFER_ID,
725                    desired_session_id=_FIRST_SESSION_ID,
726                    pending_bytes=test_max_chunk_size,
727                    max_chunk_size_bytes=test_max_chunk_size,
728                    window_end_offset=test_max_chunk_size,
729                    type=transfer_pb2.Chunk.Type.START,
730                    protocol_version=ProtocolVersion.VERSION_TWO.value,
731                ),
732                # First parameters: 1 chunk window.
733                transfer_pb2.Chunk(
734                    session_id=_FIRST_SESSION_ID,
735                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
736                    offset=0,
737                    max_chunk_size_bytes=test_max_chunk_size,
738                    window_end_offset=test_max_chunk_size,
739                    protocol_version=ProtocolVersion.VERSION_TWO.value,
740                ),
741                # Second parameters: 2 chunk window.
742                transfer_pb2.Chunk(
743                    session_id=_FIRST_SESSION_ID,
744                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
745                    offset=test_max_chunk_size,
746                    max_chunk_size_bytes=test_max_chunk_size,
747                    window_end_offset=(
748                        test_max_chunk_size + 2 * test_max_chunk_size
749                    ),
750                ),
751                # Third parameters: 4 chunk window.
752                transfer_pb2.Chunk(
753                    session_id=_FIRST_SESSION_ID,
754                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
755                    offset=2 * test_max_chunk_size,
756                    max_chunk_size_bytes=test_max_chunk_size,
757                    window_end_offset=(
758                        2 * test_max_chunk_size + 4 * test_max_chunk_size
759                    ),
760                ),
761                transfer_pb2.Chunk(
762                    session_id=_FIRST_SESSION_ID,
763                    type=transfer_pb2.Chunk.Type.COMPLETION,
764                    status=Status.OK.value,
765                ),
766            ],
767        )
768        self.assertEqual(data, b'#' * (4 * test_max_chunk_size))
769
770    def test_read_transfer_v2_adaptive_window_congestion_avoidance(
771        self,
772    ) -> None:
773        test_max_chunk_size = 16
774
775        manager = pw_transfer.Manager(
776            self._service,
777            default_response_timeout_s=DEFAULT_TIMEOUT_S,
778            max_chunk_size_bytes=test_max_chunk_size,
779            default_protocol_version=ProtocolVersion.VERSION_TWO,
780        )
781
782        self._enqueue_server_responses(
783            _Method.READ,
784            (
785                (
786                    transfer_pb2.Chunk(
787                        session_id=_FIRST_SESSION_ID,
788                        type=transfer_pb2.Chunk.Type.START_ACK,
789                        protocol_version=ProtocolVersion.VERSION_TWO.value,
790                    ),
791                ),
792                # First window: 1 chunk.
793                (
794                    transfer_pb2.Chunk(
795                        session_id=_FIRST_SESSION_ID,
796                        type=transfer_pb2.Chunk.Type.DATA,
797                        offset=0,
798                        data=b'#' * test_max_chunk_size,
799                    ),
800                ),
801                # Second window: 2 chunks.
802                (
803                    transfer_pb2.Chunk(
804                        session_id=_FIRST_SESSION_ID,
805                        type=transfer_pb2.Chunk.Type.DATA,
806                        offset=test_max_chunk_size,
807                        data=b'#' * test_max_chunk_size,
808                    ),
809                    transfer_pb2.Chunk(
810                        session_id=_FIRST_SESSION_ID,
811                        type=transfer_pb2.Chunk.Type.DATA,
812                        offset=2 * test_max_chunk_size,
813                        data=b'#' * test_max_chunk_size,
814                    ),
815                ),
816                # Third window: send the wrong offset, triggering a
817                # retransmission.
818                (
819                    transfer_pb2.Chunk(
820                        session_id=_FIRST_SESSION_ID,
821                        type=transfer_pb2.Chunk.Type.DATA,
822                        offset=5 * test_max_chunk_size,
823                        data=b'#' * test_max_chunk_size,
824                    ),
825                ),
826                # Fourth window: send the expected offset.
827                (
828                    transfer_pb2.Chunk(
829                        session_id=_FIRST_SESSION_ID,
830                        type=transfer_pb2.Chunk.Type.DATA,
831                        offset=3 * test_max_chunk_size,
832                        data=b'#' * test_max_chunk_size,
833                    ),
834                    transfer_pb2.Chunk(
835                        session_id=_FIRST_SESSION_ID,
836                        type=transfer_pb2.Chunk.Type.DATA,
837                        offset=4 * test_max_chunk_size,
838                        data=b'#' * test_max_chunk_size,
839                    ),
840                ),
841                # Fifth window: finish the transfer.
842                (
843                    transfer_pb2.Chunk(
844                        session_id=_FIRST_SESSION_ID,
845                        type=transfer_pb2.Chunk.Type.DATA,
846                        offset=5 * test_max_chunk_size,
847                        data=b'#' * test_max_chunk_size,
848                        remaining_bytes=0,
849                    ),
850                ),
851                (
852                    transfer_pb2.Chunk(
853                        session_id=_FIRST_SESSION_ID,
854                        type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
855                    ),
856                ),
857            ),
858        )
859
860        data = manager.read(_ARBITRARY_TRANSFER_ID)
861
862        self.assertEqual(
863            self._sent_chunks,
864            [
865                transfer_pb2.Chunk(
866                    type=transfer_pb2.Chunk.Type.START,
867                    transfer_id=_ARBITRARY_TRANSFER_ID,
868                    resource_id=_ARBITRARY_TRANSFER_ID,
869                    desired_session_id=_FIRST_SESSION_ID,
870                    pending_bytes=test_max_chunk_size,
871                    max_chunk_size_bytes=test_max_chunk_size,
872                    window_end_offset=test_max_chunk_size,
873                    protocol_version=ProtocolVersion.VERSION_TWO.value,
874                ),
875                # First parameters: 1 chunk window.
876                transfer_pb2.Chunk(
877                    session_id=_FIRST_SESSION_ID,
878                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
879                    offset=0,
880                    max_chunk_size_bytes=test_max_chunk_size,
881                    window_end_offset=test_max_chunk_size,
882                    protocol_version=ProtocolVersion.VERSION_TWO.value,
883                ),
884                # Second parameters: 2 chunk window.
885                transfer_pb2.Chunk(
886                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
887                    session_id=_FIRST_SESSION_ID,
888                    offset=test_max_chunk_size,
889                    max_chunk_size_bytes=test_max_chunk_size,
890                    window_end_offset=(
891                        test_max_chunk_size + 2 * test_max_chunk_size
892                    ),
893                ),
894                # Third parameters: 4 chunk window.
895                transfer_pb2.Chunk(
896                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
897                    session_id=_FIRST_SESSION_ID,
898                    offset=2 * test_max_chunk_size,
899                    max_chunk_size_bytes=test_max_chunk_size,
900                    window_end_offset=(
901                        2 * test_max_chunk_size + 4 * test_max_chunk_size
902                    ),
903                ),
904                # Fourth parameters: data loss, retransmit and halve window.
905                transfer_pb2.Chunk(
906                    type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
907                    session_id=_FIRST_SESSION_ID,
908                    offset=3 * test_max_chunk_size,
909                    max_chunk_size_bytes=test_max_chunk_size,
910                    window_end_offset=(
911                        3 * test_max_chunk_size + 2 * test_max_chunk_size
912                    ),
913                ),
914                # Fifth parameters: in congestion avoidance, window size now
915                # only increases by one chunk instead of doubling.
916                transfer_pb2.Chunk(
917                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
918                    session_id=_FIRST_SESSION_ID,
919                    offset=4 * test_max_chunk_size,
920                    max_chunk_size_bytes=test_max_chunk_size,
921                    window_end_offset=(
922                        4 * test_max_chunk_size + 3 * test_max_chunk_size
923                    ),
924                ),
925                transfer_pb2.Chunk(
926                    type=transfer_pb2.Chunk.Type.COMPLETION,
927                    session_id=_FIRST_SESSION_ID,
928                    status=Status.OK.value,
929                ),
930            ],
931        )
932        self.assertEqual(data, b'#' * (6 * test_max_chunk_size))
933
934    def test_write_transfer_basic(self) -> None:
935        manager = pw_transfer.Manager(
936            self._service,
937            default_response_timeout_s=DEFAULT_TIMEOUT_S,
938        )
939
940        self._enqueue_server_responses(
941            _Method.WRITE,
942            (
943                (
944                    transfer_pb2.Chunk(
945                        transfer_id=4,
946                        offset=0,
947                        pending_bytes=32,
948                        max_chunk_size_bytes=8,
949                    ),
950                ),
951                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
952            ),
953        )
954
955        manager.write(4, b'hello')
956        self.assertEqual(len(self._sent_chunks), 2)
957        self.assertEqual(self._received_data(), b'hello')
958
959    def test_write_transfer_max_chunk_size(self) -> None:
960        manager = pw_transfer.Manager(
961            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
962        )
963
964        self._enqueue_server_responses(
965            _Method.WRITE,
966            (
967                (
968                    transfer_pb2.Chunk(
969                        transfer_id=4,
970                        offset=0,
971                        pending_bytes=32,
972                        max_chunk_size_bytes=8,
973                    ),
974                ),
975                (),
976                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
977            ),
978        )
979
980        manager.write(4, b'hello world')
981        self.assertEqual(len(self._sent_chunks), 3)
982        self.assertEqual(self._received_data(), b'hello world')
983        self.assertEqual(self._sent_chunks[1].data, b'hello wo')
984        self.assertEqual(self._sent_chunks[2].data, b'rld')
985
986    def test_write_transfer_multiple_parameters(self) -> None:
987        manager = pw_transfer.Manager(
988            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
989        )
990
991        self._enqueue_server_responses(
992            _Method.WRITE,
993            (
994                (
995                    transfer_pb2.Chunk(
996                        transfer_id=4,
997                        offset=0,
998                        pending_bytes=8,
999                        max_chunk_size_bytes=8,
1000                    ),
1001                ),
1002                (
1003                    transfer_pb2.Chunk(
1004                        transfer_id=4,
1005                        offset=8,
1006                        pending_bytes=8,
1007                        max_chunk_size_bytes=8,
1008                    ),
1009                ),
1010                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
1011            ),
1012        )
1013
1014        manager.write(4, b'data to write')
1015        self.assertEqual(len(self._sent_chunks), 3)
1016        self.assertEqual(self._received_data(), b'data to write')
1017        self.assertEqual(self._sent_chunks[1].data, b'data to ')
1018        self.assertEqual(self._sent_chunks[2].data, b'write')
1019
1020    def test_write_transfer_progress_callback(self) -> None:
1021        manager = pw_transfer.Manager(
1022            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1023        )
1024
1025        self._enqueue_server_responses(
1026            _Method.WRITE,
1027            (
1028                (
1029                    transfer_pb2.Chunk(
1030                        transfer_id=4,
1031                        offset=0,
1032                        pending_bytes=8,
1033                        max_chunk_size_bytes=8,
1034                    ),
1035                ),
1036                (
1037                    transfer_pb2.Chunk(
1038                        transfer_id=4,
1039                        offset=8,
1040                        pending_bytes=8,
1041                        max_chunk_size_bytes=8,
1042                    ),
1043                ),
1044                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
1045            ),
1046        )
1047
1048        progress: list[pw_transfer.ProgressStats] = []
1049
1050        manager.write(4, b'data to write', progress.append)
1051        self.assertEqual(len(self._sent_chunks), 3)
1052        self.assertEqual(self._received_data(), b'data to write')
1053        self.assertEqual(self._sent_chunks[1].data, b'data to ')
1054        self.assertEqual(self._sent_chunks[2].data, b'write')
1055        self.assertEqual(
1056            progress,
1057            [
1058                pw_transfer.ProgressStats(8, 0, 13),
1059                pw_transfer.ProgressStats(13, 8, 13),
1060                pw_transfer.ProgressStats(13, 13, 13),
1061            ],
1062        )
1063
1064    def test_write_transfer_rewind(self) -> None:
1065        """Write transfer in which the server re-requests an earlier offset."""
1066        manager = pw_transfer.Manager(
1067            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1068        )
1069
1070        self._enqueue_server_responses(
1071            _Method.WRITE,
1072            (
1073                (
1074                    transfer_pb2.Chunk(
1075                        transfer_id=4,
1076                        offset=0,
1077                        pending_bytes=8,
1078                        max_chunk_size_bytes=8,
1079                    ),
1080                ),
1081                (
1082                    transfer_pb2.Chunk(
1083                        transfer_id=4,
1084                        offset=8,
1085                        pending_bytes=8,
1086                        max_chunk_size_bytes=8,
1087                    ),
1088                ),
1089                (
1090                    transfer_pb2.Chunk(
1091                        transfer_id=4,
1092                        offset=4,  # rewind
1093                        pending_bytes=8,
1094                        max_chunk_size_bytes=8,
1095                    ),
1096                ),
1097                (
1098                    transfer_pb2.Chunk(
1099                        transfer_id=4,
1100                        offset=12,
1101                        pending_bytes=16,  # update max size
1102                        max_chunk_size_bytes=16,
1103                    ),
1104                ),
1105                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
1106            ),
1107        )
1108
1109        manager.write(4, b'pigweed data transfer')
1110        self.assertEqual(len(self._sent_chunks), 5)
1111        self.assertEqual(self._sent_chunks[1].data, b'pigweed ')
1112        self.assertEqual(self._sent_chunks[2].data, b'data tra')
1113        self.assertEqual(self._sent_chunks[3].data, b'eed data')
1114        self.assertEqual(self._sent_chunks[4].data, b' transfer')
1115
1116    def test_write_transfer_bad_offset(self) -> None:
1117        manager = pw_transfer.Manager(
1118            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1119        )
1120
1121        self._enqueue_server_responses(
1122            _Method.WRITE,
1123            (
1124                (
1125                    transfer_pb2.Chunk(
1126                        transfer_id=4,
1127                        offset=0,
1128                        pending_bytes=8,
1129                        max_chunk_size_bytes=8,
1130                    ),
1131                ),
1132                (
1133                    transfer_pb2.Chunk(
1134                        transfer_id=4,
1135                        offset=100,  # larger offset than data
1136                        pending_bytes=8,
1137                        max_chunk_size_bytes=8,
1138                    ),
1139                ),
1140                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
1141            ),
1142        )
1143
1144        with self.assertRaises(pw_transfer.Error) as context:
1145            manager.write(4, b'small data')
1146
1147        exception = context.exception
1148        self.assertEqual(exception.resource_id, 4)
1149        self.assertEqual(exception.status, Status.OUT_OF_RANGE)
1150
1151    def test_write_transfer_error(self) -> None:
1152        manager = pw_transfer.Manager(
1153            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1154        )
1155
1156        self._enqueue_server_responses(
1157            _Method.WRITE,
1158            (
1159                (
1160                    transfer_pb2.Chunk(
1161                        transfer_id=21, status=Status.UNAVAILABLE.value
1162                    ),
1163                ),
1164            ),
1165        )
1166
1167        with self.assertRaises(pw_transfer.Error) as context:
1168            manager.write(21, b'no write')
1169
1170        exception = context.exception
1171        self.assertEqual(exception.resource_id, 21)
1172        self.assertEqual(exception.status, Status.UNAVAILABLE)
1173
1174    def test_write_transfer_server_error(self) -> None:
1175        manager = pw_transfer.Manager(
1176            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1177        )
1178
1179        self._enqueue_server_error(_Method.WRITE, Status.NOT_FOUND)
1180
1181        with self.assertRaises(pw_transfer.Error) as context:
1182            manager.write(21, b'server error')
1183
1184        exception = context.exception
1185        self.assertEqual(exception.resource_id, 21)
1186        self.assertEqual(exception.status, Status.INTERNAL)
1187
1188    def test_write_transfer_timeout_after_initial_chunk(self) -> None:
1189        manager = pw_transfer.Manager(
1190            self._service,
1191            default_response_timeout_s=0.001,
1192            max_retries=2,
1193            default_protocol_version=ProtocolVersion.LEGACY,
1194        )
1195
1196        with self.assertRaises(pw_transfer.Error) as context:
1197            manager.write(22, b'no server response!')
1198
1199        self.assertEqual(
1200            self._sent_chunks,
1201            [
1202                transfer_pb2.Chunk(
1203                    transfer_id=22,
1204                    resource_id=22,
1205                    type=transfer_pb2.Chunk.Type.START,
1206                ),  # initial chunk
1207                transfer_pb2.Chunk(
1208                    transfer_id=22,
1209                    resource_id=22,
1210                    type=transfer_pb2.Chunk.Type.START,
1211                ),  # retry 1
1212                transfer_pb2.Chunk(
1213                    transfer_id=22,
1214                    resource_id=22,
1215                    type=transfer_pb2.Chunk.Type.START,
1216                ),  # retry 2
1217            ],
1218        )
1219
1220        exception = context.exception
1221        self.assertEqual(exception.resource_id, 22)
1222        self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED)
1223
1224    def test_write_transfer_timeout_after_intermediate_chunk(self) -> None:
1225        """Tests write transfers that timeout after the initial chunk."""
1226        manager = pw_transfer.Manager(
1227            self._service,
1228            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1229            max_retries=2,
1230            default_protocol_version=ProtocolVersion.LEGACY,
1231        )
1232
1233        self._enqueue_server_responses(
1234            _Method.WRITE,
1235            [
1236                [
1237                    transfer_pb2.Chunk(
1238                        transfer_id=22, pending_bytes=10, max_chunk_size_bytes=5
1239                    )
1240                ]
1241            ],
1242        )
1243
1244        with self.assertRaises(pw_transfer.Error) as context:
1245            manager.write(22, b'0123456789')
1246
1247        last_data_chunk = transfer_pb2.Chunk(
1248            transfer_id=22,
1249            data=b'56789',
1250            offset=5,
1251            remaining_bytes=0,
1252            type=transfer_pb2.Chunk.Type.DATA,
1253        )
1254
1255        self.assertEqual(
1256            self._sent_chunks,
1257            [
1258                transfer_pb2.Chunk(
1259                    transfer_id=22,
1260                    resource_id=22,
1261                    type=transfer_pb2.Chunk.Type.START,
1262                ),
1263                transfer_pb2.Chunk(
1264                    transfer_id=22,
1265                    data=b'01234',
1266                    type=transfer_pb2.Chunk.Type.DATA,
1267                ),
1268                last_data_chunk,  # last chunk
1269                last_data_chunk,  # retry 1
1270                last_data_chunk,  # retry 2
1271            ],
1272        )
1273
1274        exception = context.exception
1275        self.assertEqual(exception.resource_id, 22)
1276        self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED)
1277
1278    def test_write_zero_pending_bytes_is_internal_error(self) -> None:
1279        manager = pw_transfer.Manager(
1280            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1281        )
1282
1283        self._enqueue_server_responses(
1284            _Method.WRITE,
1285            ((transfer_pb2.Chunk(transfer_id=23, pending_bytes=0),),),
1286        )
1287
1288        with self.assertRaises(pw_transfer.Error) as context:
1289            manager.write(23, b'no write')
1290
1291        exception = context.exception
1292        self.assertEqual(exception.resource_id, 23)
1293        self.assertEqual(exception.status, Status.INTERNAL)
1294
1295    def test_v2_read_transfer_basic(self) -> None:
1296        """Tests a simple protocol version 2 read transfer."""
1297        manager = pw_transfer.Manager(
1298            self._service,
1299            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1300            default_protocol_version=ProtocolVersion.VERSION_TWO,
1301        )
1302
1303        self._enqueue_server_responses(
1304            _Method.READ,
1305            (
1306                (
1307                    transfer_pb2.Chunk(
1308                        resource_id=39,
1309                        session_id=_FIRST_SESSION_ID,
1310                        type=transfer_pb2.Chunk.Type.START_ACK,
1311                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1312                    ),
1313                ),
1314                (
1315                    transfer_pb2.Chunk(
1316                        session_id=_FIRST_SESSION_ID,
1317                        type=transfer_pb2.Chunk.Type.DATA,
1318                        offset=0,
1319                        data=b'version two',
1320                        remaining_bytes=0,
1321                    ),
1322                ),
1323                (
1324                    transfer_pb2.Chunk(
1325                        session_id=_FIRST_SESSION_ID,
1326                        type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
1327                    ),
1328                ),
1329            ),
1330        )
1331
1332        data = manager.read(39)
1333
1334        self.assertEqual(
1335            self._sent_chunks,
1336            [
1337                transfer_pb2.Chunk(
1338                    transfer_id=39,
1339                    resource_id=39,
1340                    desired_session_id=_FIRST_SESSION_ID,
1341                    pending_bytes=1024,
1342                    max_chunk_size_bytes=1024,
1343                    window_end_offset=1024,
1344                    type=transfer_pb2.Chunk.Type.START,
1345                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1346                ),
1347                transfer_pb2.Chunk(
1348                    session_id=_FIRST_SESSION_ID,
1349                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1350                    max_chunk_size_bytes=1024,
1351                    window_end_offset=1024,
1352                    # pending_bytes should no longer exist as server and client
1353                    # have agreed on v2.
1354                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1355                ),
1356                transfer_pb2.Chunk(
1357                    session_id=_FIRST_SESSION_ID,
1358                    type=transfer_pb2.Chunk.Type.COMPLETION,
1359                    status=Status.OK.value,
1360                ),
1361            ],
1362        )
1363
1364        self.assertEqual(data, b'version two')
1365
1366    def test_v2_read_transfer_legacy_fallback(self) -> None:
1367        """Tests a v2 read transfer when the server only supports legacy."""
1368        manager = pw_transfer.Manager(
1369            self._service,
1370            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1371            default_protocol_version=ProtocolVersion.VERSION_TWO,
1372        )
1373
1374        # Respond to the START chunk with a legacy data transfer chunk instead
1375        # of a START_ACK.
1376        self._enqueue_server_responses(
1377            _Method.READ,
1378            (
1379                (
1380                    transfer_pb2.Chunk(
1381                        transfer_id=40,
1382                        type=transfer_pb2.Chunk.Type.DATA,
1383                        offset=0,
1384                        data=b'sorry, legacy only',
1385                        remaining_bytes=0,
1386                    ),
1387                ),
1388            ),
1389        )
1390
1391        data = manager.read(40)
1392
1393        self.assertEqual(
1394            self._sent_chunks,
1395            [
1396                transfer_pb2.Chunk(
1397                    transfer_id=40,
1398                    resource_id=40,
1399                    desired_session_id=_FIRST_SESSION_ID,
1400                    pending_bytes=1024,
1401                    max_chunk_size_bytes=1024,
1402                    window_end_offset=1024,
1403                    type=transfer_pb2.Chunk.Type.START,
1404                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1405                ),
1406                transfer_pb2.Chunk(
1407                    transfer_id=40,
1408                    type=transfer_pb2.Chunk.Type.COMPLETION,
1409                    status=Status.OK.value,
1410                ),
1411            ],
1412        )
1413
1414        self.assertEqual(data, b'sorry, legacy only')
1415
1416    def test_v2_read_transfer_re_sent_data(self) -> None:
1417        """Tests a simple protocol version 2 read transfer."""
1418        manager = pw_transfer.Manager(
1419            self._service,
1420            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1421            default_protocol_version=ProtocolVersion.VERSION_TWO,
1422        )
1423
1424        self._enqueue_server_responses(
1425            _Method.READ,
1426            (
1427                (
1428                    transfer_pb2.Chunk(
1429                        resource_id=39,
1430                        session_id=_FIRST_SESSION_ID,
1431                        type=transfer_pb2.Chunk.Type.START_ACK,
1432                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1433                    ),
1434                ),
1435                (
1436                    transfer_pb2.Chunk(
1437                        session_id=_FIRST_SESSION_ID,
1438                        type=transfer_pb2.Chunk.Type.DATA,
1439                        offset=0,
1440                        data=b'01234567',
1441                    ),
1442                    # Re-send already transmitted data.
1443                    transfer_pb2.Chunk(
1444                        session_id=_FIRST_SESSION_ID,
1445                        type=transfer_pb2.Chunk.Type.DATA,
1446                        offset=4,
1447                        data=b'4567',
1448                    ),
1449                ),
1450                (
1451                    transfer_pb2.Chunk(
1452                        session_id=_FIRST_SESSION_ID,
1453                        type=transfer_pb2.Chunk.Type.DATA,
1454                        offset=8,
1455                        data=b'89abcdef',
1456                        remaining_bytes=0,
1457                    ),
1458                ),
1459                (
1460                    transfer_pb2.Chunk(
1461                        session_id=_FIRST_SESSION_ID,
1462                        type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
1463                    ),
1464                ),
1465            ),
1466        )
1467
1468        data = manager.read(39)
1469
1470        self.assertEqual(
1471            self._sent_chunks,
1472            [
1473                transfer_pb2.Chunk(
1474                    transfer_id=39,
1475                    resource_id=39,
1476                    desired_session_id=_FIRST_SESSION_ID,
1477                    pending_bytes=1024,
1478                    max_chunk_size_bytes=1024,
1479                    window_end_offset=1024,
1480                    type=transfer_pb2.Chunk.Type.START,
1481                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1482                ),
1483                transfer_pb2.Chunk(
1484                    session_id=_FIRST_SESSION_ID,
1485                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1486                    max_chunk_size_bytes=1024,
1487                    window_end_offset=1024,
1488                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1489                ),
1490                # Should send a continue chunk in response to retransmission.
1491                transfer_pb2.Chunk(
1492                    session_id=_FIRST_SESSION_ID,
1493                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
1494                    offset=8,
1495                    max_chunk_size_bytes=1024,
1496                    window_end_offset=1024,
1497                ),
1498                transfer_pb2.Chunk(
1499                    session_id=_FIRST_SESSION_ID,
1500                    type=transfer_pb2.Chunk.Type.COMPLETION,
1501                    status=Status.OK.value,
1502                ),
1503            ],
1504        )
1505
1506        self.assertEqual(data, b'0123456789abcdef')
1507
1508    def test_v2_write_transfer_basic(self) -> None:
1509        """Tests a simple protocol version 2 write transfer."""
1510        manager = pw_transfer.Manager(
1511            self._service,
1512            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1513            default_protocol_version=ProtocolVersion.VERSION_TWO,
1514        )
1515
1516        self._enqueue_server_responses(
1517            _Method.WRITE,
1518            (
1519                (
1520                    transfer_pb2.Chunk(
1521                        resource_id=72,
1522                        session_id=_FIRST_SESSION_ID,
1523                        type=transfer_pb2.Chunk.Type.START_ACK,
1524                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1525                    ),
1526                ),
1527                (
1528                    transfer_pb2.Chunk(
1529                        session_id=_FIRST_SESSION_ID,
1530                        type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
1531                        offset=0,
1532                        window_end_offset=32,
1533                        max_chunk_size_bytes=8,
1534                    ),
1535                ),
1536                (),  # In response to the first data chunk.
1537                (
1538                    transfer_pb2.Chunk(
1539                        session_id=_FIRST_SESSION_ID,
1540                        type=transfer_pb2.Chunk.Type.COMPLETION,
1541                        status=Status.OK.value,
1542                    ),
1543                ),
1544            ),
1545        )
1546
1547        manager.write(72, b'write version 2')
1548
1549        self.assertEqual(
1550            self._sent_chunks,
1551            [
1552                transfer_pb2.Chunk(
1553                    transfer_id=72,
1554                    resource_id=72,
1555                    desired_session_id=_FIRST_SESSION_ID,
1556                    type=transfer_pb2.Chunk.Type.START,
1557                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1558                ),
1559                transfer_pb2.Chunk(
1560                    session_id=_FIRST_SESSION_ID,
1561                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1562                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1563                ),
1564                transfer_pb2.Chunk(
1565                    session_id=_FIRST_SESSION_ID,
1566                    type=transfer_pb2.Chunk.Type.DATA,
1567                    offset=0,
1568                    data=b'write ve',
1569                ),
1570                transfer_pb2.Chunk(
1571                    session_id=_FIRST_SESSION_ID,
1572                    type=transfer_pb2.Chunk.Type.DATA,
1573                    offset=8,
1574                    data=b'rsion 2',
1575                    remaining_bytes=0,
1576                ),
1577                transfer_pb2.Chunk(
1578                    session_id=_FIRST_SESSION_ID,
1579                    type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
1580                ),
1581            ],
1582        )
1583
1584        self.assertEqual(self._received_data(), b'write version 2')
1585
1586    def test_v2_write_transfer_legacy_fallback(self) -> None:
1587        """Tests a v2 write transfer when the server only supports legacy."""
1588        manager = pw_transfer.Manager(
1589            self._service,
1590            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1591            default_protocol_version=ProtocolVersion.VERSION_TWO,
1592        )
1593
1594        self._enqueue_server_responses(
1595            _Method.WRITE,
1596            (
1597                # Send a parameters chunk immediately per the legacy protocol.
1598                (
1599                    transfer_pb2.Chunk(
1600                        transfer_id=76,
1601                        type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
1602                        offset=0,
1603                        pending_bytes=32,
1604                        window_end_offset=32,
1605                        max_chunk_size_bytes=8,
1606                    ),
1607                ),
1608                (),  # In response to the first data chunk.
1609                (
1610                    transfer_pb2.Chunk(
1611                        transfer_id=76,
1612                        type=transfer_pb2.Chunk.Type.COMPLETION,
1613                        status=Status.OK.value,
1614                    ),
1615                ),
1616            ),
1617        )
1618
1619        manager.write(76, b'write v... NOPE')
1620
1621        self.assertEqual(
1622            self._sent_chunks,
1623            [
1624                transfer_pb2.Chunk(
1625                    transfer_id=76,
1626                    resource_id=76,
1627                    desired_session_id=_FIRST_SESSION_ID,
1628                    type=transfer_pb2.Chunk.Type.START,
1629                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1630                ),
1631                transfer_pb2.Chunk(
1632                    transfer_id=76,
1633                    type=transfer_pb2.Chunk.Type.DATA,
1634                    offset=0,
1635                    data=b'write v.',
1636                ),
1637                transfer_pb2.Chunk(
1638                    transfer_id=76,
1639                    type=transfer_pb2.Chunk.Type.DATA,
1640                    offset=8,
1641                    data=b'.. NOPE',
1642                    remaining_bytes=0,
1643                ),
1644            ],
1645        )
1646
1647        self.assertEqual(self._received_data(), b'write v... NOPE')
1648
1649    def test_v2_server_error(self) -> None:
1650        """Tests a server error occurring during the opening handshake."""
1651
1652        manager = pw_transfer.Manager(
1653            self._service,
1654            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1655            default_protocol_version=ProtocolVersion.VERSION_TWO,
1656        )
1657
1658        self._enqueue_server_responses(
1659            _Method.READ,
1660            (
1661                (
1662                    transfer_pb2.Chunk(
1663                        resource_id=43,
1664                        session_id=_FIRST_SESSION_ID,
1665                        type=transfer_pb2.Chunk.Type.START_ACK,
1666                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1667                    ),
1668                ),
1669                (
1670                    transfer_pb2.Chunk(
1671                        session_id=_FIRST_SESSION_ID,
1672                        type=transfer_pb2.Chunk.Type.COMPLETION,
1673                        status=Status.DATA_LOSS.value,
1674                    ),
1675                ),
1676            ),
1677        )
1678
1679        with self.assertRaises(pw_transfer.Error) as context:
1680            manager.read(43)
1681
1682        self.assertEqual(
1683            self._sent_chunks,
1684            [
1685                transfer_pb2.Chunk(
1686                    transfer_id=43,
1687                    desired_session_id=_FIRST_SESSION_ID,
1688                    resource_id=43,
1689                    pending_bytes=1024,
1690                    max_chunk_size_bytes=1024,
1691                    window_end_offset=1024,
1692                    type=transfer_pb2.Chunk.Type.START,
1693                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1694                ),
1695                transfer_pb2.Chunk(
1696                    session_id=_FIRST_SESSION_ID,
1697                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1698                    max_chunk_size_bytes=1024,
1699                    window_end_offset=1024,
1700                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1701                ),
1702                # Client sends a COMPLETION_ACK in response to the server.
1703                transfer_pb2.Chunk(
1704                    session_id=_FIRST_SESSION_ID,
1705                    type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
1706                ),
1707            ],
1708        )
1709
1710        exception = context.exception
1711        self.assertEqual(exception.resource_id, 43)
1712        self.assertEqual(exception.status, Status.DATA_LOSS)
1713
1714    def test_v2_timeout_during_opening_handshake(self) -> None:
1715        """Tests a timeout occurring during the opening handshake."""
1716        manager = pw_transfer.Manager(
1717            self._service,
1718            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1719            default_protocol_version=ProtocolVersion.VERSION_TWO,
1720        )
1721
1722        # Don't enqueue any server responses.
1723
1724        with self.assertRaises(pw_transfer.Error) as context:
1725            manager.read(41)
1726
1727        start_chunk = transfer_pb2.Chunk(
1728            transfer_id=41,
1729            resource_id=41,
1730            desired_session_id=_FIRST_SESSION_ID,
1731            pending_bytes=1024,
1732            max_chunk_size_bytes=1024,
1733            window_end_offset=1024,
1734            type=transfer_pb2.Chunk.Type.START,
1735            protocol_version=ProtocolVersion.VERSION_TWO.value,
1736        )
1737
1738        # The opening chunk should be sent initially, then retried three times.
1739        self.assertEqual(self._sent_chunks, [start_chunk] * 4)
1740
1741        exception = context.exception
1742        self.assertEqual(exception.resource_id, 41)
1743        self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED)
1744
1745    def test_v2_timeout_recovery_during_opening_handshake(self) -> None:
1746        """Tests a timeout during the opening handshake which recovers."""
1747        manager = pw_transfer.Manager(
1748            self._service,
1749            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1750            default_protocol_version=ProtocolVersion.VERSION_TWO,
1751        )
1752
1753        self._enqueue_server_responses(
1754            _Method.WRITE,
1755            (
1756                (
1757                    transfer_pb2.Chunk(
1758                        resource_id=73,
1759                        session_id=_FIRST_SESSION_ID,
1760                        type=transfer_pb2.Chunk.Type.START_ACK,
1761                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1762                    ),
1763                ),
1764                (),  # Don't respond to the START_ACK_CONFIRMATION.
1765                (),  # Don't respond to the first START_ACK_CONFIRMATION retry.
1766                (
1767                    transfer_pb2.Chunk(
1768                        session_id=_FIRST_SESSION_ID,
1769                        type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
1770                        offset=0,
1771                        window_end_offset=32,
1772                        max_chunk_size_bytes=8,
1773                    ),
1774                ),
1775                (),  # In response to the first data chunk.
1776                (
1777                    transfer_pb2.Chunk(
1778                        session_id=_FIRST_SESSION_ID,
1779                        type=transfer_pb2.Chunk.Type.COMPLETION,
1780                        status=Status.OK.value,
1781                    ),
1782                ),
1783            ),
1784        )
1785
1786        manager.write(73, b'write timeout 2')
1787
1788        start_ack_confirmation = transfer_pb2.Chunk(
1789            session_id=_FIRST_SESSION_ID,
1790            type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1791            protocol_version=ProtocolVersion.VERSION_TWO.value,
1792        )
1793
1794        self.assertEqual(
1795            self._sent_chunks,
1796            [
1797                transfer_pb2.Chunk(
1798                    transfer_id=73,
1799                    resource_id=73,
1800                    desired_session_id=_FIRST_SESSION_ID,
1801                    type=transfer_pb2.Chunk.Type.START,
1802                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1803                ),
1804                start_ack_confirmation,  # Initial transmission
1805                start_ack_confirmation,  # Retry 1
1806                start_ack_confirmation,  # Retry 2
1807                transfer_pb2.Chunk(
1808                    session_id=_FIRST_SESSION_ID,
1809                    type=transfer_pb2.Chunk.Type.DATA,
1810                    offset=0,
1811                    data=b'write ti',
1812                ),
1813                transfer_pb2.Chunk(
1814                    session_id=_FIRST_SESSION_ID,
1815                    type=transfer_pb2.Chunk.Type.DATA,
1816                    offset=8,
1817                    data=b'meout 2',
1818                    remaining_bytes=0,
1819                ),
1820                transfer_pb2.Chunk(
1821                    session_id=_FIRST_SESSION_ID,
1822                    type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
1823                ),
1824            ],
1825        )
1826
1827        self.assertEqual(self._received_data(), b'write timeout 2')
1828
1829    def test_v2_closing_handshake_bad_chunk(self) -> None:
1830        """Tests an unexpected chunk response during the closing handshake."""
1831        manager = pw_transfer.Manager(
1832            self._service,
1833            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1834            default_protocol_version=ProtocolVersion.VERSION_TWO,
1835        )
1836
1837        self._enqueue_server_responses(
1838            _Method.READ,
1839            (
1840                (
1841                    transfer_pb2.Chunk(
1842                        resource_id=47,
1843                        session_id=_FIRST_SESSION_ID,
1844                        type=transfer_pb2.Chunk.Type.START_ACK,
1845                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1846                    ),
1847                ),
1848                (
1849                    transfer_pb2.Chunk(
1850                        session_id=_FIRST_SESSION_ID,
1851                        type=transfer_pb2.Chunk.Type.DATA,
1852                        offset=0,
1853                        data=b'version two',
1854                        remaining_bytes=0,
1855                    ),
1856                ),
1857                # In response to the COMPLETION, re-send the last chunk instead
1858                # of a COMPLETION_ACK.
1859                (
1860                    transfer_pb2.Chunk(
1861                        session_id=_FIRST_SESSION_ID,
1862                        type=transfer_pb2.Chunk.Type.DATA,
1863                        offset=0,
1864                        data=b'version two',
1865                        remaining_bytes=0,
1866                    ),
1867                ),
1868                (
1869                    transfer_pb2.Chunk(
1870                        session_id=_FIRST_SESSION_ID,
1871                        type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
1872                    ),
1873                ),
1874            ),
1875        )
1876
1877        data = manager.read(47)
1878
1879        self.assertEqual(
1880            self._sent_chunks,
1881            [
1882                transfer_pb2.Chunk(
1883                    transfer_id=47,
1884                    resource_id=47,
1885                    desired_session_id=_FIRST_SESSION_ID,
1886                    pending_bytes=1024,
1887                    max_chunk_size_bytes=1024,
1888                    window_end_offset=1024,
1889                    type=transfer_pb2.Chunk.Type.START,
1890                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1891                ),
1892                transfer_pb2.Chunk(
1893                    session_id=_FIRST_SESSION_ID,
1894                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1895                    max_chunk_size_bytes=1024,
1896                    window_end_offset=1024,
1897                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1898                ),
1899                transfer_pb2.Chunk(
1900                    session_id=_FIRST_SESSION_ID,
1901                    type=transfer_pb2.Chunk.Type.COMPLETION,
1902                    status=Status.OK.value,
1903                ),
1904                # Completion should be re-sent following the repeated chunk.
1905                transfer_pb2.Chunk(
1906                    session_id=_FIRST_SESSION_ID,
1907                    type=transfer_pb2.Chunk.Type.COMPLETION,
1908                    status=Status.OK.value,
1909                ),
1910            ],
1911        )
1912
1913        self.assertEqual(data, b'version two')
1914
1915    def test_v2_timeout_during_closing_handshake(self) -> None:
1916        """Tests a timeout occurring during the closing handshake."""
1917        manager = pw_transfer.Manager(
1918            self._service,
1919            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1920            default_protocol_version=ProtocolVersion.VERSION_TWO,
1921        )
1922
1923        self._enqueue_server_responses(
1924            _Method.READ,
1925            (
1926                (
1927                    transfer_pb2.Chunk(
1928                        resource_id=47,
1929                        session_id=_FIRST_SESSION_ID,
1930                        type=transfer_pb2.Chunk.Type.START_ACK,
1931                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1932                    ),
1933                ),
1934                (
1935                    transfer_pb2.Chunk(
1936                        session_id=_FIRST_SESSION_ID,
1937                        type=transfer_pb2.Chunk.Type.DATA,
1938                        offset=0,
1939                        data=b'dropped completion',
1940                        remaining_bytes=0,
1941                    ),
1942                ),
1943                # Never send the expected COMPLETION_ACK chunk.
1944            ),
1945        )
1946
1947        data = manager.read(47)
1948
1949        self.assertEqual(
1950            self._sent_chunks,
1951            [
1952                transfer_pb2.Chunk(
1953                    transfer_id=47,
1954                    resource_id=47,
1955                    desired_session_id=_FIRST_SESSION_ID,
1956                    pending_bytes=1024,
1957                    max_chunk_size_bytes=1024,
1958                    window_end_offset=1024,
1959                    type=transfer_pb2.Chunk.Type.START,
1960                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1961                ),
1962                transfer_pb2.Chunk(
1963                    session_id=_FIRST_SESSION_ID,
1964                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1965                    max_chunk_size_bytes=1024,
1966                    window_end_offset=1024,
1967                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1968                ),
1969                transfer_pb2.Chunk(
1970                    session_id=_FIRST_SESSION_ID,
1971                    type=transfer_pb2.Chunk.Type.COMPLETION,
1972                    status=Status.OK.value,
1973                ),
1974                # The completion should be retried per the usual retry flow.
1975                transfer_pb2.Chunk(
1976                    session_id=_FIRST_SESSION_ID,
1977                    type=transfer_pb2.Chunk.Type.COMPLETION,
1978                    status=Status.OK.value,
1979                ),
1980                transfer_pb2.Chunk(
1981                    session_id=_FIRST_SESSION_ID,
1982                    type=transfer_pb2.Chunk.Type.COMPLETION,
1983                    status=Status.OK.value,
1984                ),
1985                transfer_pb2.Chunk(
1986                    session_id=_FIRST_SESSION_ID,
1987                    type=transfer_pb2.Chunk.Type.COMPLETION,
1988                    status=Status.OK.value,
1989                ),
1990            ],
1991        )
1992
1993        # Despite timing out following several retries, the transfer should
1994        # still conclude successfully, as failing to receive a COMPLETION_ACK
1995        # is not fatal.
1996        self.assertEqual(data, b'dropped completion')
1997
1998
1999class ProgressStatsTest(unittest.TestCase):
2000    def test_received_percent_known_total(self) -> None:
2001        self.assertEqual(
2002            pw_transfer.ProgressStats(75, 0, 100).percent_received(), 0.0
2003        )
2004        self.assertEqual(
2005            pw_transfer.ProgressStats(75, 50, 100).percent_received(), 50.0
2006        )
2007        self.assertEqual(
2008            pw_transfer.ProgressStats(100, 100, 100).percent_received(), 100.0
2009        )
2010
2011    def test_received_percent_unknown_total(self) -> None:
2012        self.assertTrue(
2013            math.isnan(
2014                pw_transfer.ProgressStats(75, 50, None).percent_received()
2015            )
2016        )
2017        self.assertTrue(
2018            math.isnan(
2019                pw_transfer.ProgressStats(100, 100, None).percent_received()
2020            )
2021        )
2022
2023    def test_str_known_total(self) -> None:
2024        stats = str(pw_transfer.ProgressStats(75, 50, 100))
2025        self.assertIn('75', stats)
2026        self.assertIn('50', stats)
2027        self.assertIn('100', stats)
2028
2029    def test_str_unknown_total(self) -> None:
2030        stats = str(pw_transfer.ProgressStats(75, 50, None))
2031        self.assertIn('75', stats)
2032        self.assertIn('50', stats)
2033        self.assertIn('unknown', stats)
2034
2035
2036if __name__ == '__main__':
2037    # TODO: b/265975025 - Only run this test in upstream Pigweed until the
2038    #     occasional hangs are fixed.
2039    if os.environ.get('PW_ROOT') and os.environ.get(
2040        'PW_ROOT'
2041    ) == os.environ.get('PW_PROJECT_ROOT'):
2042        unittest.main()
2043    else:
2044        print('Skipping transfer_test.py due to possible hangs (b/265975025).')
2045