1#!/usr/bin/env python3 2# Copyright 2023 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests for the transfer service client.""" 16 17import enum 18import math 19import os 20import unittest 21from typing import Iterable 22 23from pw_status import Status 24from pw_rpc import callback_client, client, ids, packets 25from pw_rpc.internal import packet_pb2 26 27import pw_transfer 28from pw_transfer import ProtocolVersion 29 30from pw_transfer import transfer_pb2 31 32_TRANSFER_SERVICE_ID = ids.calculate('pw.transfer.Transfer') 33_FIRST_SESSION_ID = 1 34_ARBITRARY_TRANSFER_ID = 66 35 36# If the default timeout is too short, some tests become flaky on Windows. 37DEFAULT_TIMEOUT_S = 0.3 38 39 40class _Method(enum.Enum): 41 READ = ids.calculate('Read') 42 WRITE = ids.calculate('Write') 43 44 45# pylint: disable=missing-function-docstring, missing-class-docstring 46 47 48class TransferManagerTest(unittest.TestCase): 49 # pylint: disable=too-many-public-methods 50 """Tests for the transfer manager.""" 51 52 def setUp(self) -> None: 53 self._client = client.Client.from_modules( 54 callback_client.Impl(), 55 [client.Channel(1, self._handle_request)], 56 (transfer_pb2,), 57 ) 58 self._service = self._client.channel(1).rpcs.pw.transfer.Transfer 59 60 self._sent_chunks: list[transfer_pb2.Chunk] = [] 61 self._packets_to_send: list[list[packet_pb2.RpcPacket]] = [] 62 63 def _enqueue_server_responses( 64 self, method: _Method, responses: Iterable[Iterable[transfer_pb2.Chunk]] 65 ) -> None: 66 for group in responses: 67 serialized_group = [] 68 for response in group: 69 serialized_group.append( 70 packet_pb2.RpcPacket( 71 type=packet_pb2.PacketType.SERVER_STREAM, 72 channel_id=1, 73 service_id=_TRANSFER_SERVICE_ID, 74 method_id=method.value, 75 status=Status.OK.value, 76 payload=response.SerializeToString(), 77 ) 78 ) 79 self._packets_to_send.append(serialized_group) 80 81 def _enqueue_server_error(self, method: _Method, error: Status) -> None: 82 self._packets_to_send.append( 83 [ 84 packet_pb2.RpcPacket( 85 type=packet_pb2.PacketType.SERVER_ERROR, 86 channel_id=1, 87 service_id=_TRANSFER_SERVICE_ID, 88 method_id=method.value, 89 status=error.value, 90 ) 91 ] 92 ) 93 94 def _handle_request(self, data: bytes) -> None: 95 packet = packets.decode(data) 96 if packet.type is not packet_pb2.PacketType.CLIENT_STREAM: 97 return 98 99 chunk = transfer_pb2.Chunk() 100 chunk.MergeFromString(packet.payload) 101 self._sent_chunks.append(chunk) 102 103 if self._packets_to_send: 104 responses = self._packets_to_send.pop(0) 105 for response in responses: 106 response.call_id = packet.call_id 107 self._client.process_packet(response.SerializeToString()) 108 109 def _received_data(self) -> bytearray: 110 data = bytearray() 111 for chunk in self._sent_chunks: 112 data.extend(chunk.data) 113 return data 114 115 def test_read_transfer_basic(self): 116 manager = pw_transfer.Manager( 117 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 118 ) 119 120 self._enqueue_server_responses( 121 _Method.READ, 122 ( 123 ( 124 transfer_pb2.Chunk( 125 transfer_id=3, offset=0, data=b'abc', remaining_bytes=0 126 ), 127 ), 128 ), 129 ) 130 131 data = manager.read(3) 132 self.assertEqual(data, b'abc') 133 self.assertEqual(len(self._sent_chunks), 2) 134 self.assertTrue(self._sent_chunks[-1].HasField('status')) 135 self.assertEqual(self._sent_chunks[-1].status, 0) 136 137 def test_read_transfer_multichunk(self) -> None: 138 manager = pw_transfer.Manager( 139 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 140 ) 141 142 self._enqueue_server_responses( 143 _Method.READ, 144 ( 145 ( 146 transfer_pb2.Chunk( 147 transfer_id=3, offset=0, data=b'abc', remaining_bytes=3 148 ), 149 transfer_pb2.Chunk( 150 transfer_id=3, offset=3, data=b'def', remaining_bytes=0 151 ), 152 ), 153 ), 154 ) 155 156 data = manager.read(3) 157 self.assertEqual(data, b'abcdef') 158 self.assertEqual(len(self._sent_chunks), 2) 159 self.assertTrue(self._sent_chunks[-1].HasField('status')) 160 self.assertEqual(self._sent_chunks[-1].status, 0) 161 162 def test_read_transfer_progress_callback(self) -> None: 163 manager = pw_transfer.Manager( 164 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 165 ) 166 167 self._enqueue_server_responses( 168 _Method.READ, 169 ( 170 ( 171 transfer_pb2.Chunk( 172 transfer_id=3, offset=0, data=b'abc', remaining_bytes=3 173 ), 174 transfer_pb2.Chunk( 175 transfer_id=3, offset=3, data=b'def', remaining_bytes=0 176 ), 177 ), 178 ), 179 ) 180 181 progress: list[pw_transfer.ProgressStats] = [] 182 183 data = manager.read(3, progress.append) 184 self.assertEqual(data, b'abcdef') 185 self.assertEqual(len(self._sent_chunks), 2) 186 self.assertTrue(self._sent_chunks[-1].HasField('status')) 187 self.assertEqual(self._sent_chunks[-1].status, 0) 188 self.assertEqual( 189 progress, 190 [ 191 pw_transfer.ProgressStats(3, 3, 6), 192 pw_transfer.ProgressStats(6, 6, 6), 193 ], 194 ) 195 196 def test_read_transfer_retry_bad_offset(self) -> None: 197 """Server responds with an unexpected offset in a read transfer.""" 198 manager = pw_transfer.Manager( 199 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 200 ) 201 202 self._enqueue_server_responses( 203 _Method.READ, 204 ( 205 ( 206 transfer_pb2.Chunk( 207 transfer_id=3, offset=0, data=b'123', remaining_bytes=6 208 ), 209 # Incorrect offset; expecting 3. 210 transfer_pb2.Chunk( 211 transfer_id=3, offset=1, data=b'456', remaining_bytes=3 212 ), 213 ), 214 ( 215 transfer_pb2.Chunk( 216 transfer_id=3, offset=3, data=b'456', remaining_bytes=3 217 ), 218 transfer_pb2.Chunk( 219 transfer_id=3, offset=6, data=b'789', remaining_bytes=0 220 ), 221 ), 222 ), 223 ) 224 225 data = manager.read(3) 226 self.assertEqual(data, b'123456789') 227 228 # Two transfer parameter requests should have been sent. 229 self.assertEqual(len(self._sent_chunks), 3) 230 self.assertTrue(self._sent_chunks[-1].HasField('status')) 231 self.assertEqual(self._sent_chunks[-1].status, 0) 232 233 def test_read_transfer_recovery_sends_parameters_on_retry(self) -> None: 234 """Server sends the same chunk twice (retry) in a read transfer.""" 235 manager = pw_transfer.Manager( 236 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 237 ) 238 239 self._enqueue_server_responses( 240 _Method.READ, 241 ( 242 ( 243 # Bad offset, enter recovery state. Only one parameters 244 # chunk should be sent. 245 transfer_pb2.Chunk( 246 transfer_id=3, offset=1, data=b'234', remaining_bytes=5 247 ), 248 transfer_pb2.Chunk( 249 transfer_id=3, offset=4, data=b'567', remaining_bytes=2 250 ), 251 transfer_pb2.Chunk( 252 transfer_id=3, offset=7, data=b'8', remaining_bytes=1 253 ), 254 ), 255 ( 256 # Only one parameters chunk should be sent after the server 257 # retries the same offset twice. 258 transfer_pb2.Chunk( 259 transfer_id=3, offset=1, data=b'234', remaining_bytes=5 260 ), 261 transfer_pb2.Chunk( 262 transfer_id=3, offset=4, data=b'567', remaining_bytes=2 263 ), 264 transfer_pb2.Chunk( 265 transfer_id=3, offset=7, data=b'8', remaining_bytes=1 266 ), 267 transfer_pb2.Chunk( 268 transfer_id=3, offset=7, data=b'8', remaining_bytes=1 269 ), 270 ), 271 ( 272 transfer_pb2.Chunk( 273 transfer_id=3, 274 offset=0, 275 data=b'123456789', 276 remaining_bytes=0, 277 ), 278 ), 279 ), 280 ) 281 282 data = manager.read(3) 283 self.assertEqual(data, b'123456789') 284 285 self.assertEqual(len(self._sent_chunks), 4) 286 self.assertEqual( 287 self._sent_chunks[0].type, transfer_pb2.Chunk.Type.START 288 ) 289 self.assertEqual(self._sent_chunks[0].offset, 0) 290 self.assertEqual( 291 self._sent_chunks[1].type, 292 transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 293 ) 294 self.assertEqual(self._sent_chunks[1].offset, 0) 295 self.assertEqual( 296 self._sent_chunks[2].type, 297 transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 298 ) 299 self.assertEqual(self._sent_chunks[2].offset, 0) 300 self.assertEqual( 301 self._sent_chunks[3].type, transfer_pb2.Chunk.Type.COMPLETION 302 ) 303 304 def test_read_transfer_retry_timeout(self) -> None: 305 """Server doesn't respond to read transfer parameters.""" 306 manager = pw_transfer.Manager( 307 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 308 ) 309 310 self._enqueue_server_responses( 311 _Method.READ, 312 ( 313 (), # Send nothing in response to the initial parameters. 314 ( 315 transfer_pb2.Chunk( 316 transfer_id=3, offset=0, data=b'xyz', remaining_bytes=0 317 ), 318 ), 319 ), 320 ) 321 322 data = manager.read(3) 323 self.assertEqual(data, b'xyz') 324 325 # Two transfer parameter requests should have been sent. 326 self.assertEqual(len(self._sent_chunks), 3) 327 self.assertTrue(self._sent_chunks[-1].HasField('status')) 328 self.assertEqual(self._sent_chunks[-1].status, 0) 329 330 def test_read_transfer_lifetime_retries(self) -> None: 331 """Server doesn't respond several times during the transfer.""" 332 manager = pw_transfer.Manager( 333 self._service, 334 default_response_timeout_s=DEFAULT_TIMEOUT_S, 335 max_retries=2**32 - 1, 336 max_lifetime_retries=4, 337 ) 338 339 self._enqueue_server_responses( 340 _Method.READ, 341 ( 342 (), # Retry 1 343 (), # Retry 2 344 ( 345 transfer_pb2.Chunk( # Expected chunk. 346 transfer_id=43, offset=0, data=b'xyz' 347 ), 348 ), 349 # Don't send anything else. The maximum lifetime retry count 350 # should be hit. 351 ), 352 ) 353 354 with self.assertRaises(pw_transfer.Error) as context: 355 manager.read(43) 356 357 self.assertEqual(len(self._sent_chunks), 5) 358 359 exception = context.exception 360 self.assertEqual(exception.resource_id, 43) 361 self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) 362 363 def test_read_transfer_timeout(self) -> None: 364 manager = pw_transfer.Manager( 365 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 366 ) 367 368 with self.assertRaises(pw_transfer.Error) as context: 369 manager.read(27) 370 371 exception = context.exception 372 self.assertEqual(exception.resource_id, 27) 373 self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) 374 375 # The client should have sent four transfer parameters requests: one 376 # initial, and three retries. 377 self.assertEqual(len(self._sent_chunks), 4) 378 379 def test_read_transfer_error(self) -> None: 380 manager = pw_transfer.Manager( 381 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 382 ) 383 384 self._enqueue_server_responses( 385 _Method.READ, 386 ( 387 ( 388 transfer_pb2.Chunk( 389 transfer_id=31, status=Status.NOT_FOUND.value 390 ), 391 ), 392 ), 393 ) 394 395 with self.assertRaises(pw_transfer.Error) as context: 396 manager.read(31) 397 398 exception = context.exception 399 self.assertEqual(exception.resource_id, 31) 400 self.assertEqual(exception.status, Status.NOT_FOUND) 401 402 def test_read_transfer_server_error(self) -> None: 403 manager = pw_transfer.Manager( 404 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 405 ) 406 407 self._enqueue_server_error(_Method.READ, Status.NOT_FOUND) 408 409 with self.assertRaises(pw_transfer.Error) as context: 410 manager.read(31) 411 412 exception = context.exception 413 self.assertEqual(exception.resource_id, 31) 414 self.assertEqual(exception.status, Status.INTERNAL) 415 416 def test_read_transfer_adaptive_window_slow_start(self) -> None: 417 test_max_chunk_size = 16 418 419 manager = pw_transfer.Manager( 420 self._service, 421 default_response_timeout_s=DEFAULT_TIMEOUT_S, 422 max_chunk_size_bytes=test_max_chunk_size, 423 default_protocol_version=ProtocolVersion.LEGACY, 424 ) 425 426 self._enqueue_server_responses( 427 _Method.READ, 428 ( 429 # First window: 1 chunk. 430 ( 431 transfer_pb2.Chunk( 432 transfer_id=_ARBITRARY_TRANSFER_ID, 433 type=transfer_pb2.Chunk.Type.DATA, 434 offset=0, 435 data=b'#' * test_max_chunk_size, 436 ), 437 ), 438 # Second window: 2 chunks. 439 ( 440 transfer_pb2.Chunk( 441 transfer_id=_ARBITRARY_TRANSFER_ID, 442 type=transfer_pb2.Chunk.Type.DATA, 443 offset=test_max_chunk_size, 444 data=b'#' * test_max_chunk_size, 445 ), 446 transfer_pb2.Chunk( 447 transfer_id=_ARBITRARY_TRANSFER_ID, 448 type=transfer_pb2.Chunk.Type.DATA, 449 offset=2 * test_max_chunk_size, 450 data=b'#' * test_max_chunk_size, 451 ), 452 ), 453 # Third window: finish transfer. 454 ( 455 transfer_pb2.Chunk( 456 transfer_id=_ARBITRARY_TRANSFER_ID, 457 type=transfer_pb2.Chunk.Type.DATA, 458 offset=3 * test_max_chunk_size, 459 data=b'#' * test_max_chunk_size, 460 remaining_bytes=0, 461 ), 462 ), 463 ), 464 ) 465 466 data = manager.read(_ARBITRARY_TRANSFER_ID) 467 468 self.assertEqual( 469 self._sent_chunks, 470 [ 471 # First parameters: 1 chunk window. 472 transfer_pb2.Chunk( 473 type=transfer_pb2.Chunk.Type.START, 474 transfer_id=_ARBITRARY_TRANSFER_ID, 475 resource_id=_ARBITRARY_TRANSFER_ID, 476 pending_bytes=test_max_chunk_size, 477 max_chunk_size_bytes=test_max_chunk_size, 478 window_end_offset=test_max_chunk_size, 479 ), 480 # Second parameters: 2 chunk window. 481 transfer_pb2.Chunk( 482 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 483 transfer_id=_ARBITRARY_TRANSFER_ID, 484 offset=test_max_chunk_size, 485 pending_bytes=2 * test_max_chunk_size, 486 max_chunk_size_bytes=test_max_chunk_size, 487 window_end_offset=( 488 test_max_chunk_size + 2 * test_max_chunk_size 489 ), 490 ), 491 # Third parameters: 4 chunk window. 492 transfer_pb2.Chunk( 493 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 494 transfer_id=_ARBITRARY_TRANSFER_ID, 495 offset=2 * test_max_chunk_size, 496 pending_bytes=4 * test_max_chunk_size, 497 max_chunk_size_bytes=test_max_chunk_size, 498 window_end_offset=( 499 2 * test_max_chunk_size + 4 * test_max_chunk_size 500 ), 501 ), 502 transfer_pb2.Chunk( 503 type=transfer_pb2.Chunk.Type.COMPLETION, 504 transfer_id=_ARBITRARY_TRANSFER_ID, 505 status=Status.OK.value, 506 ), 507 ], 508 ) 509 self.assertEqual(data, b'#' * (4 * test_max_chunk_size)) 510 511 def test_read_transfer_adaptive_window_congestion_avoidance(self) -> None: 512 test_max_chunk_size = 16 513 514 manager = pw_transfer.Manager( 515 self._service, 516 default_response_timeout_s=DEFAULT_TIMEOUT_S, 517 max_chunk_size_bytes=test_max_chunk_size, 518 default_protocol_version=ProtocolVersion.LEGACY, 519 ) 520 521 self._enqueue_server_responses( 522 _Method.READ, 523 ( 524 # First window: 1 chunk. 525 ( 526 transfer_pb2.Chunk( 527 transfer_id=_ARBITRARY_TRANSFER_ID, 528 type=transfer_pb2.Chunk.Type.DATA, 529 offset=0, 530 data=b'#' * test_max_chunk_size, 531 ), 532 ), 533 # Second window: 2 chunks. 534 ( 535 transfer_pb2.Chunk( 536 transfer_id=_ARBITRARY_TRANSFER_ID, 537 type=transfer_pb2.Chunk.Type.DATA, 538 offset=test_max_chunk_size, 539 data=b'#' * test_max_chunk_size, 540 ), 541 transfer_pb2.Chunk( 542 transfer_id=_ARBITRARY_TRANSFER_ID, 543 type=transfer_pb2.Chunk.Type.DATA, 544 offset=2 * test_max_chunk_size, 545 data=b'#' * test_max_chunk_size, 546 ), 547 ), 548 # Third window: send the wrong offset, triggering a 549 # retransmission. 550 ( 551 transfer_pb2.Chunk( 552 transfer_id=_ARBITRARY_TRANSFER_ID, 553 type=transfer_pb2.Chunk.Type.DATA, 554 offset=5 * test_max_chunk_size, 555 data=b'#' * test_max_chunk_size, 556 ), 557 ), 558 # Fourth window: send the expected offset. 559 ( 560 transfer_pb2.Chunk( 561 transfer_id=_ARBITRARY_TRANSFER_ID, 562 type=transfer_pb2.Chunk.Type.DATA, 563 offset=3 * test_max_chunk_size, 564 data=b'#' * test_max_chunk_size, 565 ), 566 transfer_pb2.Chunk( 567 transfer_id=_ARBITRARY_TRANSFER_ID, 568 type=transfer_pb2.Chunk.Type.DATA, 569 offset=4 * test_max_chunk_size, 570 data=b'#' * test_max_chunk_size, 571 ), 572 ), 573 # Fifth window: finish the transfer. 574 ( 575 transfer_pb2.Chunk( 576 transfer_id=_ARBITRARY_TRANSFER_ID, 577 type=transfer_pb2.Chunk.Type.DATA, 578 offset=5 * test_max_chunk_size, 579 data=b'#' * test_max_chunk_size, 580 remaining_bytes=0, 581 ), 582 ), 583 ), 584 ) 585 586 data = manager.read(_ARBITRARY_TRANSFER_ID) 587 588 self.assertEqual( 589 self._sent_chunks, 590 [ 591 # First parameters: 1 chunk window. 592 transfer_pb2.Chunk( 593 type=transfer_pb2.Chunk.Type.START, 594 transfer_id=_ARBITRARY_TRANSFER_ID, 595 resource_id=_ARBITRARY_TRANSFER_ID, 596 pending_bytes=test_max_chunk_size, 597 max_chunk_size_bytes=test_max_chunk_size, 598 window_end_offset=test_max_chunk_size, 599 ), 600 # Second parameters: 2 chunk window. 601 transfer_pb2.Chunk( 602 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 603 transfer_id=_ARBITRARY_TRANSFER_ID, 604 offset=test_max_chunk_size, 605 pending_bytes=2 * test_max_chunk_size, 606 max_chunk_size_bytes=test_max_chunk_size, 607 window_end_offset=( 608 test_max_chunk_size + 2 * test_max_chunk_size 609 ), 610 ), 611 # Third parameters: 4 chunk window. 612 transfer_pb2.Chunk( 613 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 614 transfer_id=_ARBITRARY_TRANSFER_ID, 615 offset=2 * test_max_chunk_size, 616 pending_bytes=4 * test_max_chunk_size, 617 max_chunk_size_bytes=test_max_chunk_size, 618 window_end_offset=( 619 2 * test_max_chunk_size + 4 * test_max_chunk_size 620 ), 621 ), 622 # Fourth parameters: data loss, retransmit and halve window. 623 transfer_pb2.Chunk( 624 type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 625 transfer_id=_ARBITRARY_TRANSFER_ID, 626 offset=3 * test_max_chunk_size, 627 pending_bytes=2 * test_max_chunk_size, 628 max_chunk_size_bytes=test_max_chunk_size, 629 window_end_offset=( 630 3 * test_max_chunk_size + 2 * test_max_chunk_size 631 ), 632 ), 633 # Fifth parameters: in congestion avoidance, window size now 634 # only increases by one chunk instead of doubling. 635 transfer_pb2.Chunk( 636 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 637 transfer_id=_ARBITRARY_TRANSFER_ID, 638 offset=4 * test_max_chunk_size, 639 pending_bytes=3 * test_max_chunk_size, 640 max_chunk_size_bytes=test_max_chunk_size, 641 window_end_offset=( 642 4 * test_max_chunk_size + 3 * test_max_chunk_size 643 ), 644 ), 645 transfer_pb2.Chunk( 646 type=transfer_pb2.Chunk.Type.COMPLETION, 647 transfer_id=_ARBITRARY_TRANSFER_ID, 648 status=Status.OK.value, 649 ), 650 ], 651 ) 652 self.assertEqual(data, b'#' * (6 * test_max_chunk_size)) 653 654 def test_read_transfer_v2_adaptive_window_slow_start(self) -> None: 655 test_max_chunk_size = 16 656 657 manager = pw_transfer.Manager( 658 self._service, 659 default_response_timeout_s=DEFAULT_TIMEOUT_S, 660 max_chunk_size_bytes=test_max_chunk_size, 661 default_protocol_version=ProtocolVersion.VERSION_TWO, 662 ) 663 664 self._enqueue_server_responses( 665 _Method.READ, 666 ( 667 ( 668 transfer_pb2.Chunk( 669 session_id=_FIRST_SESSION_ID, 670 type=transfer_pb2.Chunk.Type.START_ACK, 671 protocol_version=ProtocolVersion.VERSION_TWO.value, 672 ), 673 ), 674 # First window: 1 chunk. 675 ( 676 transfer_pb2.Chunk( 677 session_id=_FIRST_SESSION_ID, 678 type=transfer_pb2.Chunk.Type.DATA, 679 offset=0, 680 data=b'#' * test_max_chunk_size, 681 ), 682 ), 683 # Second window: 2 chunks. 684 ( 685 transfer_pb2.Chunk( 686 session_id=_FIRST_SESSION_ID, 687 type=transfer_pb2.Chunk.Type.DATA, 688 offset=test_max_chunk_size, 689 data=b'#' * test_max_chunk_size, 690 ), 691 transfer_pb2.Chunk( 692 session_id=_FIRST_SESSION_ID, 693 type=transfer_pb2.Chunk.Type.DATA, 694 offset=2 * test_max_chunk_size, 695 data=b'#' * test_max_chunk_size, 696 ), 697 ), 698 # Third window: finish transfer. 699 ( 700 transfer_pb2.Chunk( 701 session_id=_FIRST_SESSION_ID, 702 type=transfer_pb2.Chunk.Type.DATA, 703 offset=3 * test_max_chunk_size, 704 data=b'#' * test_max_chunk_size, 705 remaining_bytes=0, 706 ), 707 ), 708 ( 709 transfer_pb2.Chunk( 710 session_id=_FIRST_SESSION_ID, 711 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 712 ), 713 ), 714 ), 715 ) 716 717 data = manager.read(_ARBITRARY_TRANSFER_ID) 718 719 self.assertEqual( 720 self._sent_chunks, 721 [ 722 transfer_pb2.Chunk( 723 transfer_id=_ARBITRARY_TRANSFER_ID, 724 resource_id=_ARBITRARY_TRANSFER_ID, 725 desired_session_id=_FIRST_SESSION_ID, 726 pending_bytes=test_max_chunk_size, 727 max_chunk_size_bytes=test_max_chunk_size, 728 window_end_offset=test_max_chunk_size, 729 type=transfer_pb2.Chunk.Type.START, 730 protocol_version=ProtocolVersion.VERSION_TWO.value, 731 ), 732 # First parameters: 1 chunk window. 733 transfer_pb2.Chunk( 734 session_id=_FIRST_SESSION_ID, 735 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 736 offset=0, 737 max_chunk_size_bytes=test_max_chunk_size, 738 window_end_offset=test_max_chunk_size, 739 protocol_version=ProtocolVersion.VERSION_TWO.value, 740 ), 741 # Second parameters: 2 chunk window. 742 transfer_pb2.Chunk( 743 session_id=_FIRST_SESSION_ID, 744 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 745 offset=test_max_chunk_size, 746 max_chunk_size_bytes=test_max_chunk_size, 747 window_end_offset=( 748 test_max_chunk_size + 2 * test_max_chunk_size 749 ), 750 ), 751 # Third parameters: 4 chunk window. 752 transfer_pb2.Chunk( 753 session_id=_FIRST_SESSION_ID, 754 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 755 offset=2 * test_max_chunk_size, 756 max_chunk_size_bytes=test_max_chunk_size, 757 window_end_offset=( 758 2 * test_max_chunk_size + 4 * test_max_chunk_size 759 ), 760 ), 761 transfer_pb2.Chunk( 762 session_id=_FIRST_SESSION_ID, 763 type=transfer_pb2.Chunk.Type.COMPLETION, 764 status=Status.OK.value, 765 ), 766 ], 767 ) 768 self.assertEqual(data, b'#' * (4 * test_max_chunk_size)) 769 770 def test_read_transfer_v2_adaptive_window_congestion_avoidance( 771 self, 772 ) -> None: 773 test_max_chunk_size = 16 774 775 manager = pw_transfer.Manager( 776 self._service, 777 default_response_timeout_s=DEFAULT_TIMEOUT_S, 778 max_chunk_size_bytes=test_max_chunk_size, 779 default_protocol_version=ProtocolVersion.VERSION_TWO, 780 ) 781 782 self._enqueue_server_responses( 783 _Method.READ, 784 ( 785 ( 786 transfer_pb2.Chunk( 787 session_id=_FIRST_SESSION_ID, 788 type=transfer_pb2.Chunk.Type.START_ACK, 789 protocol_version=ProtocolVersion.VERSION_TWO.value, 790 ), 791 ), 792 # First window: 1 chunk. 793 ( 794 transfer_pb2.Chunk( 795 session_id=_FIRST_SESSION_ID, 796 type=transfer_pb2.Chunk.Type.DATA, 797 offset=0, 798 data=b'#' * test_max_chunk_size, 799 ), 800 ), 801 # Second window: 2 chunks. 802 ( 803 transfer_pb2.Chunk( 804 session_id=_FIRST_SESSION_ID, 805 type=transfer_pb2.Chunk.Type.DATA, 806 offset=test_max_chunk_size, 807 data=b'#' * test_max_chunk_size, 808 ), 809 transfer_pb2.Chunk( 810 session_id=_FIRST_SESSION_ID, 811 type=transfer_pb2.Chunk.Type.DATA, 812 offset=2 * test_max_chunk_size, 813 data=b'#' * test_max_chunk_size, 814 ), 815 ), 816 # Third window: send the wrong offset, triggering a 817 # retransmission. 818 ( 819 transfer_pb2.Chunk( 820 session_id=_FIRST_SESSION_ID, 821 type=transfer_pb2.Chunk.Type.DATA, 822 offset=5 * test_max_chunk_size, 823 data=b'#' * test_max_chunk_size, 824 ), 825 ), 826 # Fourth window: send the expected offset. 827 ( 828 transfer_pb2.Chunk( 829 session_id=_FIRST_SESSION_ID, 830 type=transfer_pb2.Chunk.Type.DATA, 831 offset=3 * test_max_chunk_size, 832 data=b'#' * test_max_chunk_size, 833 ), 834 transfer_pb2.Chunk( 835 session_id=_FIRST_SESSION_ID, 836 type=transfer_pb2.Chunk.Type.DATA, 837 offset=4 * test_max_chunk_size, 838 data=b'#' * test_max_chunk_size, 839 ), 840 ), 841 # Fifth window: finish the transfer. 842 ( 843 transfer_pb2.Chunk( 844 session_id=_FIRST_SESSION_ID, 845 type=transfer_pb2.Chunk.Type.DATA, 846 offset=5 * test_max_chunk_size, 847 data=b'#' * test_max_chunk_size, 848 remaining_bytes=0, 849 ), 850 ), 851 ( 852 transfer_pb2.Chunk( 853 session_id=_FIRST_SESSION_ID, 854 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 855 ), 856 ), 857 ), 858 ) 859 860 data = manager.read(_ARBITRARY_TRANSFER_ID) 861 862 self.assertEqual( 863 self._sent_chunks, 864 [ 865 transfer_pb2.Chunk( 866 type=transfer_pb2.Chunk.Type.START, 867 transfer_id=_ARBITRARY_TRANSFER_ID, 868 resource_id=_ARBITRARY_TRANSFER_ID, 869 desired_session_id=_FIRST_SESSION_ID, 870 pending_bytes=test_max_chunk_size, 871 max_chunk_size_bytes=test_max_chunk_size, 872 window_end_offset=test_max_chunk_size, 873 protocol_version=ProtocolVersion.VERSION_TWO.value, 874 ), 875 # First parameters: 1 chunk window. 876 transfer_pb2.Chunk( 877 session_id=_FIRST_SESSION_ID, 878 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 879 offset=0, 880 max_chunk_size_bytes=test_max_chunk_size, 881 window_end_offset=test_max_chunk_size, 882 protocol_version=ProtocolVersion.VERSION_TWO.value, 883 ), 884 # Second parameters: 2 chunk window. 885 transfer_pb2.Chunk( 886 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 887 session_id=_FIRST_SESSION_ID, 888 offset=test_max_chunk_size, 889 max_chunk_size_bytes=test_max_chunk_size, 890 window_end_offset=( 891 test_max_chunk_size + 2 * test_max_chunk_size 892 ), 893 ), 894 # Third parameters: 4 chunk window. 895 transfer_pb2.Chunk( 896 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 897 session_id=_FIRST_SESSION_ID, 898 offset=2 * test_max_chunk_size, 899 max_chunk_size_bytes=test_max_chunk_size, 900 window_end_offset=( 901 2 * test_max_chunk_size + 4 * test_max_chunk_size 902 ), 903 ), 904 # Fourth parameters: data loss, retransmit and halve window. 905 transfer_pb2.Chunk( 906 type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 907 session_id=_FIRST_SESSION_ID, 908 offset=3 * test_max_chunk_size, 909 max_chunk_size_bytes=test_max_chunk_size, 910 window_end_offset=( 911 3 * test_max_chunk_size + 2 * test_max_chunk_size 912 ), 913 ), 914 # Fifth parameters: in congestion avoidance, window size now 915 # only increases by one chunk instead of doubling. 916 transfer_pb2.Chunk( 917 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 918 session_id=_FIRST_SESSION_ID, 919 offset=4 * test_max_chunk_size, 920 max_chunk_size_bytes=test_max_chunk_size, 921 window_end_offset=( 922 4 * test_max_chunk_size + 3 * test_max_chunk_size 923 ), 924 ), 925 transfer_pb2.Chunk( 926 type=transfer_pb2.Chunk.Type.COMPLETION, 927 session_id=_FIRST_SESSION_ID, 928 status=Status.OK.value, 929 ), 930 ], 931 ) 932 self.assertEqual(data, b'#' * (6 * test_max_chunk_size)) 933 934 def test_write_transfer_basic(self) -> None: 935 manager = pw_transfer.Manager( 936 self._service, 937 default_response_timeout_s=DEFAULT_TIMEOUT_S, 938 ) 939 940 self._enqueue_server_responses( 941 _Method.WRITE, 942 ( 943 ( 944 transfer_pb2.Chunk( 945 transfer_id=4, 946 offset=0, 947 pending_bytes=32, 948 max_chunk_size_bytes=8, 949 ), 950 ), 951 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 952 ), 953 ) 954 955 manager.write(4, b'hello') 956 self.assertEqual(len(self._sent_chunks), 2) 957 self.assertEqual(self._received_data(), b'hello') 958 959 def test_write_transfer_max_chunk_size(self) -> None: 960 manager = pw_transfer.Manager( 961 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 962 ) 963 964 self._enqueue_server_responses( 965 _Method.WRITE, 966 ( 967 ( 968 transfer_pb2.Chunk( 969 transfer_id=4, 970 offset=0, 971 pending_bytes=32, 972 max_chunk_size_bytes=8, 973 ), 974 ), 975 (), 976 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 977 ), 978 ) 979 980 manager.write(4, b'hello world') 981 self.assertEqual(len(self._sent_chunks), 3) 982 self.assertEqual(self._received_data(), b'hello world') 983 self.assertEqual(self._sent_chunks[1].data, b'hello wo') 984 self.assertEqual(self._sent_chunks[2].data, b'rld') 985 986 def test_write_transfer_multiple_parameters(self) -> None: 987 manager = pw_transfer.Manager( 988 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 989 ) 990 991 self._enqueue_server_responses( 992 _Method.WRITE, 993 ( 994 ( 995 transfer_pb2.Chunk( 996 transfer_id=4, 997 offset=0, 998 pending_bytes=8, 999 max_chunk_size_bytes=8, 1000 ), 1001 ), 1002 ( 1003 transfer_pb2.Chunk( 1004 transfer_id=4, 1005 offset=8, 1006 pending_bytes=8, 1007 max_chunk_size_bytes=8, 1008 ), 1009 ), 1010 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 1011 ), 1012 ) 1013 1014 manager.write(4, b'data to write') 1015 self.assertEqual(len(self._sent_chunks), 3) 1016 self.assertEqual(self._received_data(), b'data to write') 1017 self.assertEqual(self._sent_chunks[1].data, b'data to ') 1018 self.assertEqual(self._sent_chunks[2].data, b'write') 1019 1020 def test_write_transfer_progress_callback(self) -> None: 1021 manager = pw_transfer.Manager( 1022 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1023 ) 1024 1025 self._enqueue_server_responses( 1026 _Method.WRITE, 1027 ( 1028 ( 1029 transfer_pb2.Chunk( 1030 transfer_id=4, 1031 offset=0, 1032 pending_bytes=8, 1033 max_chunk_size_bytes=8, 1034 ), 1035 ), 1036 ( 1037 transfer_pb2.Chunk( 1038 transfer_id=4, 1039 offset=8, 1040 pending_bytes=8, 1041 max_chunk_size_bytes=8, 1042 ), 1043 ), 1044 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 1045 ), 1046 ) 1047 1048 progress: list[pw_transfer.ProgressStats] = [] 1049 1050 manager.write(4, b'data to write', progress.append) 1051 self.assertEqual(len(self._sent_chunks), 3) 1052 self.assertEqual(self._received_data(), b'data to write') 1053 self.assertEqual(self._sent_chunks[1].data, b'data to ') 1054 self.assertEqual(self._sent_chunks[2].data, b'write') 1055 self.assertEqual( 1056 progress, 1057 [ 1058 pw_transfer.ProgressStats(8, 0, 13), 1059 pw_transfer.ProgressStats(13, 8, 13), 1060 pw_transfer.ProgressStats(13, 13, 13), 1061 ], 1062 ) 1063 1064 def test_write_transfer_rewind(self) -> None: 1065 """Write transfer in which the server re-requests an earlier offset.""" 1066 manager = pw_transfer.Manager( 1067 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1068 ) 1069 1070 self._enqueue_server_responses( 1071 _Method.WRITE, 1072 ( 1073 ( 1074 transfer_pb2.Chunk( 1075 transfer_id=4, 1076 offset=0, 1077 pending_bytes=8, 1078 max_chunk_size_bytes=8, 1079 ), 1080 ), 1081 ( 1082 transfer_pb2.Chunk( 1083 transfer_id=4, 1084 offset=8, 1085 pending_bytes=8, 1086 max_chunk_size_bytes=8, 1087 ), 1088 ), 1089 ( 1090 transfer_pb2.Chunk( 1091 transfer_id=4, 1092 offset=4, # rewind 1093 pending_bytes=8, 1094 max_chunk_size_bytes=8, 1095 ), 1096 ), 1097 ( 1098 transfer_pb2.Chunk( 1099 transfer_id=4, 1100 offset=12, 1101 pending_bytes=16, # update max size 1102 max_chunk_size_bytes=16, 1103 ), 1104 ), 1105 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 1106 ), 1107 ) 1108 1109 manager.write(4, b'pigweed data transfer') 1110 self.assertEqual(len(self._sent_chunks), 5) 1111 self.assertEqual(self._sent_chunks[1].data, b'pigweed ') 1112 self.assertEqual(self._sent_chunks[2].data, b'data tra') 1113 self.assertEqual(self._sent_chunks[3].data, b'eed data') 1114 self.assertEqual(self._sent_chunks[4].data, b' transfer') 1115 1116 def test_write_transfer_bad_offset(self) -> None: 1117 manager = pw_transfer.Manager( 1118 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1119 ) 1120 1121 self._enqueue_server_responses( 1122 _Method.WRITE, 1123 ( 1124 ( 1125 transfer_pb2.Chunk( 1126 transfer_id=4, 1127 offset=0, 1128 pending_bytes=8, 1129 max_chunk_size_bytes=8, 1130 ), 1131 ), 1132 ( 1133 transfer_pb2.Chunk( 1134 transfer_id=4, 1135 offset=100, # larger offset than data 1136 pending_bytes=8, 1137 max_chunk_size_bytes=8, 1138 ), 1139 ), 1140 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 1141 ), 1142 ) 1143 1144 with self.assertRaises(pw_transfer.Error) as context: 1145 manager.write(4, b'small data') 1146 1147 exception = context.exception 1148 self.assertEqual(exception.resource_id, 4) 1149 self.assertEqual(exception.status, Status.OUT_OF_RANGE) 1150 1151 def test_write_transfer_error(self) -> None: 1152 manager = pw_transfer.Manager( 1153 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1154 ) 1155 1156 self._enqueue_server_responses( 1157 _Method.WRITE, 1158 ( 1159 ( 1160 transfer_pb2.Chunk( 1161 transfer_id=21, status=Status.UNAVAILABLE.value 1162 ), 1163 ), 1164 ), 1165 ) 1166 1167 with self.assertRaises(pw_transfer.Error) as context: 1168 manager.write(21, b'no write') 1169 1170 exception = context.exception 1171 self.assertEqual(exception.resource_id, 21) 1172 self.assertEqual(exception.status, Status.UNAVAILABLE) 1173 1174 def test_write_transfer_server_error(self) -> None: 1175 manager = pw_transfer.Manager( 1176 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1177 ) 1178 1179 self._enqueue_server_error(_Method.WRITE, Status.NOT_FOUND) 1180 1181 with self.assertRaises(pw_transfer.Error) as context: 1182 manager.write(21, b'server error') 1183 1184 exception = context.exception 1185 self.assertEqual(exception.resource_id, 21) 1186 self.assertEqual(exception.status, Status.INTERNAL) 1187 1188 def test_write_transfer_timeout_after_initial_chunk(self) -> None: 1189 manager = pw_transfer.Manager( 1190 self._service, 1191 default_response_timeout_s=0.001, 1192 max_retries=2, 1193 default_protocol_version=ProtocolVersion.LEGACY, 1194 ) 1195 1196 with self.assertRaises(pw_transfer.Error) as context: 1197 manager.write(22, b'no server response!') 1198 1199 self.assertEqual( 1200 self._sent_chunks, 1201 [ 1202 transfer_pb2.Chunk( 1203 transfer_id=22, 1204 resource_id=22, 1205 type=transfer_pb2.Chunk.Type.START, 1206 ), # initial chunk 1207 transfer_pb2.Chunk( 1208 transfer_id=22, 1209 resource_id=22, 1210 type=transfer_pb2.Chunk.Type.START, 1211 ), # retry 1 1212 transfer_pb2.Chunk( 1213 transfer_id=22, 1214 resource_id=22, 1215 type=transfer_pb2.Chunk.Type.START, 1216 ), # retry 2 1217 ], 1218 ) 1219 1220 exception = context.exception 1221 self.assertEqual(exception.resource_id, 22) 1222 self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) 1223 1224 def test_write_transfer_timeout_after_intermediate_chunk(self) -> None: 1225 """Tests write transfers that timeout after the initial chunk.""" 1226 manager = pw_transfer.Manager( 1227 self._service, 1228 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1229 max_retries=2, 1230 default_protocol_version=ProtocolVersion.LEGACY, 1231 ) 1232 1233 self._enqueue_server_responses( 1234 _Method.WRITE, 1235 [ 1236 [ 1237 transfer_pb2.Chunk( 1238 transfer_id=22, pending_bytes=10, max_chunk_size_bytes=5 1239 ) 1240 ] 1241 ], 1242 ) 1243 1244 with self.assertRaises(pw_transfer.Error) as context: 1245 manager.write(22, b'0123456789') 1246 1247 last_data_chunk = transfer_pb2.Chunk( 1248 transfer_id=22, 1249 data=b'56789', 1250 offset=5, 1251 remaining_bytes=0, 1252 type=transfer_pb2.Chunk.Type.DATA, 1253 ) 1254 1255 self.assertEqual( 1256 self._sent_chunks, 1257 [ 1258 transfer_pb2.Chunk( 1259 transfer_id=22, 1260 resource_id=22, 1261 type=transfer_pb2.Chunk.Type.START, 1262 ), 1263 transfer_pb2.Chunk( 1264 transfer_id=22, 1265 data=b'01234', 1266 type=transfer_pb2.Chunk.Type.DATA, 1267 ), 1268 last_data_chunk, # last chunk 1269 last_data_chunk, # retry 1 1270 last_data_chunk, # retry 2 1271 ], 1272 ) 1273 1274 exception = context.exception 1275 self.assertEqual(exception.resource_id, 22) 1276 self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) 1277 1278 def test_write_zero_pending_bytes_is_internal_error(self) -> None: 1279 manager = pw_transfer.Manager( 1280 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1281 ) 1282 1283 self._enqueue_server_responses( 1284 _Method.WRITE, 1285 ((transfer_pb2.Chunk(transfer_id=23, pending_bytes=0),),), 1286 ) 1287 1288 with self.assertRaises(pw_transfer.Error) as context: 1289 manager.write(23, b'no write') 1290 1291 exception = context.exception 1292 self.assertEqual(exception.resource_id, 23) 1293 self.assertEqual(exception.status, Status.INTERNAL) 1294 1295 def test_v2_read_transfer_basic(self) -> None: 1296 """Tests a simple protocol version 2 read transfer.""" 1297 manager = pw_transfer.Manager( 1298 self._service, 1299 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1300 default_protocol_version=ProtocolVersion.VERSION_TWO, 1301 ) 1302 1303 self._enqueue_server_responses( 1304 _Method.READ, 1305 ( 1306 ( 1307 transfer_pb2.Chunk( 1308 resource_id=39, 1309 session_id=_FIRST_SESSION_ID, 1310 type=transfer_pb2.Chunk.Type.START_ACK, 1311 protocol_version=ProtocolVersion.VERSION_TWO.value, 1312 ), 1313 ), 1314 ( 1315 transfer_pb2.Chunk( 1316 session_id=_FIRST_SESSION_ID, 1317 type=transfer_pb2.Chunk.Type.DATA, 1318 offset=0, 1319 data=b'version two', 1320 remaining_bytes=0, 1321 ), 1322 ), 1323 ( 1324 transfer_pb2.Chunk( 1325 session_id=_FIRST_SESSION_ID, 1326 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 1327 ), 1328 ), 1329 ), 1330 ) 1331 1332 data = manager.read(39) 1333 1334 self.assertEqual( 1335 self._sent_chunks, 1336 [ 1337 transfer_pb2.Chunk( 1338 transfer_id=39, 1339 resource_id=39, 1340 desired_session_id=_FIRST_SESSION_ID, 1341 pending_bytes=1024, 1342 max_chunk_size_bytes=1024, 1343 window_end_offset=1024, 1344 type=transfer_pb2.Chunk.Type.START, 1345 protocol_version=ProtocolVersion.VERSION_TWO.value, 1346 ), 1347 transfer_pb2.Chunk( 1348 session_id=_FIRST_SESSION_ID, 1349 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1350 max_chunk_size_bytes=1024, 1351 window_end_offset=1024, 1352 # pending_bytes should no longer exist as server and client 1353 # have agreed on v2. 1354 protocol_version=ProtocolVersion.VERSION_TWO.value, 1355 ), 1356 transfer_pb2.Chunk( 1357 session_id=_FIRST_SESSION_ID, 1358 type=transfer_pb2.Chunk.Type.COMPLETION, 1359 status=Status.OK.value, 1360 ), 1361 ], 1362 ) 1363 1364 self.assertEqual(data, b'version two') 1365 1366 def test_v2_read_transfer_legacy_fallback(self) -> None: 1367 """Tests a v2 read transfer when the server only supports legacy.""" 1368 manager = pw_transfer.Manager( 1369 self._service, 1370 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1371 default_protocol_version=ProtocolVersion.VERSION_TWO, 1372 ) 1373 1374 # Respond to the START chunk with a legacy data transfer chunk instead 1375 # of a START_ACK. 1376 self._enqueue_server_responses( 1377 _Method.READ, 1378 ( 1379 ( 1380 transfer_pb2.Chunk( 1381 transfer_id=40, 1382 type=transfer_pb2.Chunk.Type.DATA, 1383 offset=0, 1384 data=b'sorry, legacy only', 1385 remaining_bytes=0, 1386 ), 1387 ), 1388 ), 1389 ) 1390 1391 data = manager.read(40) 1392 1393 self.assertEqual( 1394 self._sent_chunks, 1395 [ 1396 transfer_pb2.Chunk( 1397 transfer_id=40, 1398 resource_id=40, 1399 desired_session_id=_FIRST_SESSION_ID, 1400 pending_bytes=1024, 1401 max_chunk_size_bytes=1024, 1402 window_end_offset=1024, 1403 type=transfer_pb2.Chunk.Type.START, 1404 protocol_version=ProtocolVersion.VERSION_TWO.value, 1405 ), 1406 transfer_pb2.Chunk( 1407 transfer_id=40, 1408 type=transfer_pb2.Chunk.Type.COMPLETION, 1409 status=Status.OK.value, 1410 ), 1411 ], 1412 ) 1413 1414 self.assertEqual(data, b'sorry, legacy only') 1415 1416 def test_v2_read_transfer_re_sent_data(self) -> None: 1417 """Tests a simple protocol version 2 read transfer.""" 1418 manager = pw_transfer.Manager( 1419 self._service, 1420 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1421 default_protocol_version=ProtocolVersion.VERSION_TWO, 1422 ) 1423 1424 self._enqueue_server_responses( 1425 _Method.READ, 1426 ( 1427 ( 1428 transfer_pb2.Chunk( 1429 resource_id=39, 1430 session_id=_FIRST_SESSION_ID, 1431 type=transfer_pb2.Chunk.Type.START_ACK, 1432 protocol_version=ProtocolVersion.VERSION_TWO.value, 1433 ), 1434 ), 1435 ( 1436 transfer_pb2.Chunk( 1437 session_id=_FIRST_SESSION_ID, 1438 type=transfer_pb2.Chunk.Type.DATA, 1439 offset=0, 1440 data=b'01234567', 1441 ), 1442 # Re-send already transmitted data. 1443 transfer_pb2.Chunk( 1444 session_id=_FIRST_SESSION_ID, 1445 type=transfer_pb2.Chunk.Type.DATA, 1446 offset=4, 1447 data=b'4567', 1448 ), 1449 ), 1450 ( 1451 transfer_pb2.Chunk( 1452 session_id=_FIRST_SESSION_ID, 1453 type=transfer_pb2.Chunk.Type.DATA, 1454 offset=8, 1455 data=b'89abcdef', 1456 remaining_bytes=0, 1457 ), 1458 ), 1459 ( 1460 transfer_pb2.Chunk( 1461 session_id=_FIRST_SESSION_ID, 1462 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 1463 ), 1464 ), 1465 ), 1466 ) 1467 1468 data = manager.read(39) 1469 1470 self.assertEqual( 1471 self._sent_chunks, 1472 [ 1473 transfer_pb2.Chunk( 1474 transfer_id=39, 1475 resource_id=39, 1476 desired_session_id=_FIRST_SESSION_ID, 1477 pending_bytes=1024, 1478 max_chunk_size_bytes=1024, 1479 window_end_offset=1024, 1480 type=transfer_pb2.Chunk.Type.START, 1481 protocol_version=ProtocolVersion.VERSION_TWO.value, 1482 ), 1483 transfer_pb2.Chunk( 1484 session_id=_FIRST_SESSION_ID, 1485 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1486 max_chunk_size_bytes=1024, 1487 window_end_offset=1024, 1488 protocol_version=ProtocolVersion.VERSION_TWO.value, 1489 ), 1490 # Should send a continue chunk in response to retransmission. 1491 transfer_pb2.Chunk( 1492 session_id=_FIRST_SESSION_ID, 1493 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 1494 offset=8, 1495 max_chunk_size_bytes=1024, 1496 window_end_offset=1024, 1497 ), 1498 transfer_pb2.Chunk( 1499 session_id=_FIRST_SESSION_ID, 1500 type=transfer_pb2.Chunk.Type.COMPLETION, 1501 status=Status.OK.value, 1502 ), 1503 ], 1504 ) 1505 1506 self.assertEqual(data, b'0123456789abcdef') 1507 1508 def test_v2_write_transfer_basic(self) -> None: 1509 """Tests a simple protocol version 2 write transfer.""" 1510 manager = pw_transfer.Manager( 1511 self._service, 1512 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1513 default_protocol_version=ProtocolVersion.VERSION_TWO, 1514 ) 1515 1516 self._enqueue_server_responses( 1517 _Method.WRITE, 1518 ( 1519 ( 1520 transfer_pb2.Chunk( 1521 resource_id=72, 1522 session_id=_FIRST_SESSION_ID, 1523 type=transfer_pb2.Chunk.Type.START_ACK, 1524 protocol_version=ProtocolVersion.VERSION_TWO.value, 1525 ), 1526 ), 1527 ( 1528 transfer_pb2.Chunk( 1529 session_id=_FIRST_SESSION_ID, 1530 type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 1531 offset=0, 1532 window_end_offset=32, 1533 max_chunk_size_bytes=8, 1534 ), 1535 ), 1536 (), # In response to the first data chunk. 1537 ( 1538 transfer_pb2.Chunk( 1539 session_id=_FIRST_SESSION_ID, 1540 type=transfer_pb2.Chunk.Type.COMPLETION, 1541 status=Status.OK.value, 1542 ), 1543 ), 1544 ), 1545 ) 1546 1547 manager.write(72, b'write version 2') 1548 1549 self.assertEqual( 1550 self._sent_chunks, 1551 [ 1552 transfer_pb2.Chunk( 1553 transfer_id=72, 1554 resource_id=72, 1555 desired_session_id=_FIRST_SESSION_ID, 1556 type=transfer_pb2.Chunk.Type.START, 1557 protocol_version=ProtocolVersion.VERSION_TWO.value, 1558 ), 1559 transfer_pb2.Chunk( 1560 session_id=_FIRST_SESSION_ID, 1561 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1562 protocol_version=ProtocolVersion.VERSION_TWO.value, 1563 ), 1564 transfer_pb2.Chunk( 1565 session_id=_FIRST_SESSION_ID, 1566 type=transfer_pb2.Chunk.Type.DATA, 1567 offset=0, 1568 data=b'write ve', 1569 ), 1570 transfer_pb2.Chunk( 1571 session_id=_FIRST_SESSION_ID, 1572 type=transfer_pb2.Chunk.Type.DATA, 1573 offset=8, 1574 data=b'rsion 2', 1575 remaining_bytes=0, 1576 ), 1577 transfer_pb2.Chunk( 1578 session_id=_FIRST_SESSION_ID, 1579 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 1580 ), 1581 ], 1582 ) 1583 1584 self.assertEqual(self._received_data(), b'write version 2') 1585 1586 def test_v2_write_transfer_legacy_fallback(self) -> None: 1587 """Tests a v2 write transfer when the server only supports legacy.""" 1588 manager = pw_transfer.Manager( 1589 self._service, 1590 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1591 default_protocol_version=ProtocolVersion.VERSION_TWO, 1592 ) 1593 1594 self._enqueue_server_responses( 1595 _Method.WRITE, 1596 ( 1597 # Send a parameters chunk immediately per the legacy protocol. 1598 ( 1599 transfer_pb2.Chunk( 1600 transfer_id=76, 1601 type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 1602 offset=0, 1603 pending_bytes=32, 1604 window_end_offset=32, 1605 max_chunk_size_bytes=8, 1606 ), 1607 ), 1608 (), # In response to the first data chunk. 1609 ( 1610 transfer_pb2.Chunk( 1611 transfer_id=76, 1612 type=transfer_pb2.Chunk.Type.COMPLETION, 1613 status=Status.OK.value, 1614 ), 1615 ), 1616 ), 1617 ) 1618 1619 manager.write(76, b'write v... NOPE') 1620 1621 self.assertEqual( 1622 self._sent_chunks, 1623 [ 1624 transfer_pb2.Chunk( 1625 transfer_id=76, 1626 resource_id=76, 1627 desired_session_id=_FIRST_SESSION_ID, 1628 type=transfer_pb2.Chunk.Type.START, 1629 protocol_version=ProtocolVersion.VERSION_TWO.value, 1630 ), 1631 transfer_pb2.Chunk( 1632 transfer_id=76, 1633 type=transfer_pb2.Chunk.Type.DATA, 1634 offset=0, 1635 data=b'write v.', 1636 ), 1637 transfer_pb2.Chunk( 1638 transfer_id=76, 1639 type=transfer_pb2.Chunk.Type.DATA, 1640 offset=8, 1641 data=b'.. NOPE', 1642 remaining_bytes=0, 1643 ), 1644 ], 1645 ) 1646 1647 self.assertEqual(self._received_data(), b'write v... NOPE') 1648 1649 def test_v2_server_error(self) -> None: 1650 """Tests a server error occurring during the opening handshake.""" 1651 1652 manager = pw_transfer.Manager( 1653 self._service, 1654 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1655 default_protocol_version=ProtocolVersion.VERSION_TWO, 1656 ) 1657 1658 self._enqueue_server_responses( 1659 _Method.READ, 1660 ( 1661 ( 1662 transfer_pb2.Chunk( 1663 resource_id=43, 1664 session_id=_FIRST_SESSION_ID, 1665 type=transfer_pb2.Chunk.Type.START_ACK, 1666 protocol_version=ProtocolVersion.VERSION_TWO.value, 1667 ), 1668 ), 1669 ( 1670 transfer_pb2.Chunk( 1671 session_id=_FIRST_SESSION_ID, 1672 type=transfer_pb2.Chunk.Type.COMPLETION, 1673 status=Status.DATA_LOSS.value, 1674 ), 1675 ), 1676 ), 1677 ) 1678 1679 with self.assertRaises(pw_transfer.Error) as context: 1680 manager.read(43) 1681 1682 self.assertEqual( 1683 self._sent_chunks, 1684 [ 1685 transfer_pb2.Chunk( 1686 transfer_id=43, 1687 desired_session_id=_FIRST_SESSION_ID, 1688 resource_id=43, 1689 pending_bytes=1024, 1690 max_chunk_size_bytes=1024, 1691 window_end_offset=1024, 1692 type=transfer_pb2.Chunk.Type.START, 1693 protocol_version=ProtocolVersion.VERSION_TWO.value, 1694 ), 1695 transfer_pb2.Chunk( 1696 session_id=_FIRST_SESSION_ID, 1697 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1698 max_chunk_size_bytes=1024, 1699 window_end_offset=1024, 1700 protocol_version=ProtocolVersion.VERSION_TWO.value, 1701 ), 1702 # Client sends a COMPLETION_ACK in response to the server. 1703 transfer_pb2.Chunk( 1704 session_id=_FIRST_SESSION_ID, 1705 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 1706 ), 1707 ], 1708 ) 1709 1710 exception = context.exception 1711 self.assertEqual(exception.resource_id, 43) 1712 self.assertEqual(exception.status, Status.DATA_LOSS) 1713 1714 def test_v2_timeout_during_opening_handshake(self) -> None: 1715 """Tests a timeout occurring during the opening handshake.""" 1716 manager = pw_transfer.Manager( 1717 self._service, 1718 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1719 default_protocol_version=ProtocolVersion.VERSION_TWO, 1720 ) 1721 1722 # Don't enqueue any server responses. 1723 1724 with self.assertRaises(pw_transfer.Error) as context: 1725 manager.read(41) 1726 1727 start_chunk = transfer_pb2.Chunk( 1728 transfer_id=41, 1729 resource_id=41, 1730 desired_session_id=_FIRST_SESSION_ID, 1731 pending_bytes=1024, 1732 max_chunk_size_bytes=1024, 1733 window_end_offset=1024, 1734 type=transfer_pb2.Chunk.Type.START, 1735 protocol_version=ProtocolVersion.VERSION_TWO.value, 1736 ) 1737 1738 # The opening chunk should be sent initially, then retried three times. 1739 self.assertEqual(self._sent_chunks, [start_chunk] * 4) 1740 1741 exception = context.exception 1742 self.assertEqual(exception.resource_id, 41) 1743 self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) 1744 1745 def test_v2_timeout_recovery_during_opening_handshake(self) -> None: 1746 """Tests a timeout during the opening handshake which recovers.""" 1747 manager = pw_transfer.Manager( 1748 self._service, 1749 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1750 default_protocol_version=ProtocolVersion.VERSION_TWO, 1751 ) 1752 1753 self._enqueue_server_responses( 1754 _Method.WRITE, 1755 ( 1756 ( 1757 transfer_pb2.Chunk( 1758 resource_id=73, 1759 session_id=_FIRST_SESSION_ID, 1760 type=transfer_pb2.Chunk.Type.START_ACK, 1761 protocol_version=ProtocolVersion.VERSION_TWO.value, 1762 ), 1763 ), 1764 (), # Don't respond to the START_ACK_CONFIRMATION. 1765 (), # Don't respond to the first START_ACK_CONFIRMATION retry. 1766 ( 1767 transfer_pb2.Chunk( 1768 session_id=_FIRST_SESSION_ID, 1769 type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 1770 offset=0, 1771 window_end_offset=32, 1772 max_chunk_size_bytes=8, 1773 ), 1774 ), 1775 (), # In response to the first data chunk. 1776 ( 1777 transfer_pb2.Chunk( 1778 session_id=_FIRST_SESSION_ID, 1779 type=transfer_pb2.Chunk.Type.COMPLETION, 1780 status=Status.OK.value, 1781 ), 1782 ), 1783 ), 1784 ) 1785 1786 manager.write(73, b'write timeout 2') 1787 1788 start_ack_confirmation = transfer_pb2.Chunk( 1789 session_id=_FIRST_SESSION_ID, 1790 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1791 protocol_version=ProtocolVersion.VERSION_TWO.value, 1792 ) 1793 1794 self.assertEqual( 1795 self._sent_chunks, 1796 [ 1797 transfer_pb2.Chunk( 1798 transfer_id=73, 1799 resource_id=73, 1800 desired_session_id=_FIRST_SESSION_ID, 1801 type=transfer_pb2.Chunk.Type.START, 1802 protocol_version=ProtocolVersion.VERSION_TWO.value, 1803 ), 1804 start_ack_confirmation, # Initial transmission 1805 start_ack_confirmation, # Retry 1 1806 start_ack_confirmation, # Retry 2 1807 transfer_pb2.Chunk( 1808 session_id=_FIRST_SESSION_ID, 1809 type=transfer_pb2.Chunk.Type.DATA, 1810 offset=0, 1811 data=b'write ti', 1812 ), 1813 transfer_pb2.Chunk( 1814 session_id=_FIRST_SESSION_ID, 1815 type=transfer_pb2.Chunk.Type.DATA, 1816 offset=8, 1817 data=b'meout 2', 1818 remaining_bytes=0, 1819 ), 1820 transfer_pb2.Chunk( 1821 session_id=_FIRST_SESSION_ID, 1822 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 1823 ), 1824 ], 1825 ) 1826 1827 self.assertEqual(self._received_data(), b'write timeout 2') 1828 1829 def test_v2_closing_handshake_bad_chunk(self) -> None: 1830 """Tests an unexpected chunk response during the closing handshake.""" 1831 manager = pw_transfer.Manager( 1832 self._service, 1833 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1834 default_protocol_version=ProtocolVersion.VERSION_TWO, 1835 ) 1836 1837 self._enqueue_server_responses( 1838 _Method.READ, 1839 ( 1840 ( 1841 transfer_pb2.Chunk( 1842 resource_id=47, 1843 session_id=_FIRST_SESSION_ID, 1844 type=transfer_pb2.Chunk.Type.START_ACK, 1845 protocol_version=ProtocolVersion.VERSION_TWO.value, 1846 ), 1847 ), 1848 ( 1849 transfer_pb2.Chunk( 1850 session_id=_FIRST_SESSION_ID, 1851 type=transfer_pb2.Chunk.Type.DATA, 1852 offset=0, 1853 data=b'version two', 1854 remaining_bytes=0, 1855 ), 1856 ), 1857 # In response to the COMPLETION, re-send the last chunk instead 1858 # of a COMPLETION_ACK. 1859 ( 1860 transfer_pb2.Chunk( 1861 session_id=_FIRST_SESSION_ID, 1862 type=transfer_pb2.Chunk.Type.DATA, 1863 offset=0, 1864 data=b'version two', 1865 remaining_bytes=0, 1866 ), 1867 ), 1868 ( 1869 transfer_pb2.Chunk( 1870 session_id=_FIRST_SESSION_ID, 1871 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 1872 ), 1873 ), 1874 ), 1875 ) 1876 1877 data = manager.read(47) 1878 1879 self.assertEqual( 1880 self._sent_chunks, 1881 [ 1882 transfer_pb2.Chunk( 1883 transfer_id=47, 1884 resource_id=47, 1885 desired_session_id=_FIRST_SESSION_ID, 1886 pending_bytes=1024, 1887 max_chunk_size_bytes=1024, 1888 window_end_offset=1024, 1889 type=transfer_pb2.Chunk.Type.START, 1890 protocol_version=ProtocolVersion.VERSION_TWO.value, 1891 ), 1892 transfer_pb2.Chunk( 1893 session_id=_FIRST_SESSION_ID, 1894 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1895 max_chunk_size_bytes=1024, 1896 window_end_offset=1024, 1897 protocol_version=ProtocolVersion.VERSION_TWO.value, 1898 ), 1899 transfer_pb2.Chunk( 1900 session_id=_FIRST_SESSION_ID, 1901 type=transfer_pb2.Chunk.Type.COMPLETION, 1902 status=Status.OK.value, 1903 ), 1904 # Completion should be re-sent following the repeated chunk. 1905 transfer_pb2.Chunk( 1906 session_id=_FIRST_SESSION_ID, 1907 type=transfer_pb2.Chunk.Type.COMPLETION, 1908 status=Status.OK.value, 1909 ), 1910 ], 1911 ) 1912 1913 self.assertEqual(data, b'version two') 1914 1915 def test_v2_timeout_during_closing_handshake(self) -> None: 1916 """Tests a timeout occurring during the closing handshake.""" 1917 manager = pw_transfer.Manager( 1918 self._service, 1919 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1920 default_protocol_version=ProtocolVersion.VERSION_TWO, 1921 ) 1922 1923 self._enqueue_server_responses( 1924 _Method.READ, 1925 ( 1926 ( 1927 transfer_pb2.Chunk( 1928 resource_id=47, 1929 session_id=_FIRST_SESSION_ID, 1930 type=transfer_pb2.Chunk.Type.START_ACK, 1931 protocol_version=ProtocolVersion.VERSION_TWO.value, 1932 ), 1933 ), 1934 ( 1935 transfer_pb2.Chunk( 1936 session_id=_FIRST_SESSION_ID, 1937 type=transfer_pb2.Chunk.Type.DATA, 1938 offset=0, 1939 data=b'dropped completion', 1940 remaining_bytes=0, 1941 ), 1942 ), 1943 # Never send the expected COMPLETION_ACK chunk. 1944 ), 1945 ) 1946 1947 data = manager.read(47) 1948 1949 self.assertEqual( 1950 self._sent_chunks, 1951 [ 1952 transfer_pb2.Chunk( 1953 transfer_id=47, 1954 resource_id=47, 1955 desired_session_id=_FIRST_SESSION_ID, 1956 pending_bytes=1024, 1957 max_chunk_size_bytes=1024, 1958 window_end_offset=1024, 1959 type=transfer_pb2.Chunk.Type.START, 1960 protocol_version=ProtocolVersion.VERSION_TWO.value, 1961 ), 1962 transfer_pb2.Chunk( 1963 session_id=_FIRST_SESSION_ID, 1964 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1965 max_chunk_size_bytes=1024, 1966 window_end_offset=1024, 1967 protocol_version=ProtocolVersion.VERSION_TWO.value, 1968 ), 1969 transfer_pb2.Chunk( 1970 session_id=_FIRST_SESSION_ID, 1971 type=transfer_pb2.Chunk.Type.COMPLETION, 1972 status=Status.OK.value, 1973 ), 1974 # The completion should be retried per the usual retry flow. 1975 transfer_pb2.Chunk( 1976 session_id=_FIRST_SESSION_ID, 1977 type=transfer_pb2.Chunk.Type.COMPLETION, 1978 status=Status.OK.value, 1979 ), 1980 transfer_pb2.Chunk( 1981 session_id=_FIRST_SESSION_ID, 1982 type=transfer_pb2.Chunk.Type.COMPLETION, 1983 status=Status.OK.value, 1984 ), 1985 transfer_pb2.Chunk( 1986 session_id=_FIRST_SESSION_ID, 1987 type=transfer_pb2.Chunk.Type.COMPLETION, 1988 status=Status.OK.value, 1989 ), 1990 ], 1991 ) 1992 1993 # Despite timing out following several retries, the transfer should 1994 # still conclude successfully, as failing to receive a COMPLETION_ACK 1995 # is not fatal. 1996 self.assertEqual(data, b'dropped completion') 1997 1998 1999class ProgressStatsTest(unittest.TestCase): 2000 def test_received_percent_known_total(self) -> None: 2001 self.assertEqual( 2002 pw_transfer.ProgressStats(75, 0, 100).percent_received(), 0.0 2003 ) 2004 self.assertEqual( 2005 pw_transfer.ProgressStats(75, 50, 100).percent_received(), 50.0 2006 ) 2007 self.assertEqual( 2008 pw_transfer.ProgressStats(100, 100, 100).percent_received(), 100.0 2009 ) 2010 2011 def test_received_percent_unknown_total(self) -> None: 2012 self.assertTrue( 2013 math.isnan( 2014 pw_transfer.ProgressStats(75, 50, None).percent_received() 2015 ) 2016 ) 2017 self.assertTrue( 2018 math.isnan( 2019 pw_transfer.ProgressStats(100, 100, None).percent_received() 2020 ) 2021 ) 2022 2023 def test_str_known_total(self) -> None: 2024 stats = str(pw_transfer.ProgressStats(75, 50, 100)) 2025 self.assertIn('75', stats) 2026 self.assertIn('50', stats) 2027 self.assertIn('100', stats) 2028 2029 def test_str_unknown_total(self) -> None: 2030 stats = str(pw_transfer.ProgressStats(75, 50, None)) 2031 self.assertIn('75', stats) 2032 self.assertIn('50', stats) 2033 self.assertIn('unknown', stats) 2034 2035 2036if __name__ == '__main__': 2037 # TODO: b/265975025 - Only run this test in upstream Pigweed until the 2038 # occasional hangs are fixed. 2039 if os.environ.get('PW_ROOT') and os.environ.get( 2040 'PW_ROOT' 2041 ) == os.environ.get('PW_PROJECT_ROOT'): 2042 unittest.main() 2043 else: 2044 print('Skipping transfer_test.py due to possible hangs (b/265975025).') 2045