1# Copyright 2023 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Protocol version-aware chunk message wrapper.""" 15 16from __future__ import annotations 17 18import enum 19from typing import Any 20 21from pw_status import Status 22 23from pw_transfer import transfer_pb2 24 25 26class ProtocolVersion(enum.Enum): 27 """Supported versions of pw_transfer's RPC data transfer protocol.""" 28 29 # Protocol version not known or not set. 30 UNKNOWN = 0 31 32 # The original transfer protocol, prior to transfer start/end handshakes. 33 LEGACY = 1 34 35 # Second version of the transfer protocol. Guarantees type fields on all 36 # chunks, deprecates pending_bytes in favor of window_end_offset, splits 37 # transfer resource IDs from ephemeral session IDs, and adds a handshake 38 # to the start and end of all transfer sessions. 39 VERSION_TWO = 2 40 41 # Alias to the most up-to-date version of the transfer protocol. 42 LATEST = VERSION_TWO 43 44 45_ChunkType = transfer_pb2.Chunk.Type 46 47 48class Chunk: 49 """A chunk exchanged in a pw_transfer stream. 50 51 Wraps the generated protobuf Chunk class with protocol-aware field encoding 52 and decoding. 53 54 Attributes: 55 protocol_version: Version of the transfer protocol with which the chunk 56 is encoded. 57 chunk_type: Type of the chunk within the protocol. 58 session_id: ID for the transfer session to which the chunk belongs. 59 desired_session_id: For a v2 START chunk, the client-assigned session ID 60 to request from the server. 61 resource_id: For a v2 START chunk, ID of the resource to transfer. 62 offset: Offset of the data to be transferred. 63 window_end_offset: In a parameters chunk, end offset of the available 64 window. 65 data: Raw transfer data. 66 remaining_bytes: Optional number of bytes remaining in the transfer. 67 Set to 0 when the data is fully transferred. 68 max_chunk_size_bytes: Maximum number of bytes to send in a single data 69 chunk. 70 min_delay_microseconds: Delay between data chunks to be sent. 71 """ 72 73 # pylint: disable=too-many-instance-attributes 74 75 Type = transfer_pb2.Chunk.Type 76 77 # TODO(frolv): Figure out how to make the chunk type annotation work. 78 # pylint: disable=too-many-arguments 79 def __init__( 80 self, 81 protocol_version: ProtocolVersion, 82 chunk_type: Any, 83 session_id: int = 0, 84 desired_session_id: int | None = None, 85 resource_id: int | None = None, 86 offset: int = 0, 87 window_end_offset: int = 0, 88 data: bytes = b'', 89 remaining_bytes: int | None = None, 90 max_chunk_size_bytes: int | None = None, 91 min_delay_microseconds: int | None = None, 92 status: Status | None = None, 93 initial_offset: int = 0, 94 ): 95 """Creates a new transfer chunk. 96 97 Args: 98 protocol_version: Version of the transfer protocol with which to 99 encode the chunk. 100 chunk_type: Type of the chunk within the protocol. 101 session_id: ID for the transfer session to which the chunk belongs. 102 desired_session_id: For a v2 START chunk, the client-assigned 103 session ID to request from the server. 104 resource_id: For a v2 START chunk, ID of the resource to transfer. 105 offset: Offset of the data to be transferred. 106 window_end_offset: In a parameters chunk, end offset of the 107 available window. 108 data: Raw transfer data. 109 remaining_bytes: Optional number of bytes remaining in the transfer. 110 Set to 0 when the data is fully transferred. 111 max_chunk_size_bytes: Maximum number of bytes to send in a single 112 data chunk. 113 min_delay_microseconds: Delay between data chunks to be sent. 114 status: In a COMPLETION chunk, final status of the transfer. 115 initial_offset: Initial offset for non-zero starting offset 116 transfers 117 """ 118 self.protocol_version = protocol_version 119 self.type = chunk_type 120 self.session_id = session_id 121 self.desired_session_id = desired_session_id 122 self.resource_id = resource_id 123 self.offset = offset 124 self.window_end_offset = window_end_offset 125 self.data = data 126 self.remaining_bytes = remaining_bytes 127 self.max_chunk_size_bytes = max_chunk_size_bytes 128 self.min_delay_microseconds = min_delay_microseconds 129 self.status = status 130 self.initial_offset = initial_offset 131 132 @classmethod 133 def from_message(cls, message: transfer_pb2.Chunk) -> Chunk: 134 """Parses a Chunk from a protobuf message.""" 135 136 # Some very old versions of transfer don't always encode chunk types, 137 # so they must be deduced. 138 # 139 # The type-less legacy transfer protocol doesn't support handshakes or 140 # continuation parameters. Therefore, there are only three possible 141 # types: start, data, and retransmit. 142 if message.HasField('type'): 143 chunk_type = message.type 144 elif ( 145 message.offset == 0 146 and not message.data 147 and not message.HasField('status') 148 ): 149 chunk_type = Chunk.Type.START 150 elif message.data: 151 chunk_type = Chunk.Type.DATA 152 else: 153 chunk_type = Chunk.Type.PARAMETERS_RETRANSMIT 154 155 chunk = cls( 156 ProtocolVersion.UNKNOWN, 157 chunk_type, 158 offset=message.offset, 159 window_end_offset=message.window_end_offset, 160 data=message.data, 161 initial_offset=message.initial_offset, 162 ) 163 164 if message.HasField('session_id'): 165 chunk.protocol_version = ProtocolVersion.VERSION_TWO 166 chunk.session_id = message.session_id 167 else: 168 chunk.protocol_version = ProtocolVersion.LEGACY 169 chunk.session_id = message.transfer_id 170 171 if message.HasField('desired_session_id'): 172 chunk.protocol_version = ProtocolVersion.VERSION_TWO 173 chunk.desired_session_id = message.desired_session_id 174 175 if message.HasField('resource_id'): 176 chunk.resource_id = message.resource_id 177 178 if message.HasField('protocol_version'): 179 # An explicitly specified protocol version overrides any inferred 180 # one. 181 chunk.protocol_version = ProtocolVersion(message.protocol_version) 182 183 if message.HasField('pending_bytes'): 184 chunk.window_end_offset = message.offset + message.pending_bytes 185 186 if message.HasField('remaining_bytes'): 187 chunk.remaining_bytes = message.remaining_bytes 188 189 if message.HasField('max_chunk_size_bytes'): 190 chunk.max_chunk_size_bytes = message.max_chunk_size_bytes 191 192 if message.HasField('min_delay_microseconds'): 193 chunk.min_delay_microseconds = message.min_delay_microseconds 194 195 if message.HasField('status'): 196 chunk.status = Status(message.status) 197 198 if chunk.protocol_version is ProtocolVersion.UNKNOWN: 199 # If no fields in the chunk specified its protocol version, 200 # assume it is a legacy chunk. 201 chunk.protocol_version = ProtocolVersion.LEGACY 202 203 return chunk 204 205 def to_message(self) -> transfer_pb2.Chunk: 206 """Converts the chunk to a protobuf message.""" 207 message = transfer_pb2.Chunk( 208 offset=self.offset, 209 window_end_offset=self.window_end_offset, 210 type=self.type, 211 ) 212 213 if self.resource_id is not None: 214 message.resource_id = self.resource_id 215 216 if self.protocol_version is ProtocolVersion.VERSION_TWO: 217 if self.session_id != 0: 218 assert self.desired_session_id is None 219 message.session_id = self.session_id 220 221 if self.desired_session_id is not None: 222 message.desired_session_id = self.desired_session_id 223 224 if self._should_encode_legacy_fields(): 225 if self.resource_id is not None: 226 message.transfer_id = self.resource_id 227 else: 228 assert self.session_id != 0 229 message.transfer_id = self.session_id 230 231 # In the legacy protocol, the pending_bytes field must be set 232 # alongside window_end_offset, as some transfer implementations 233 # require it. 234 if self.window_end_offset != 0: 235 message.pending_bytes = self.window_end_offset - self.offset 236 237 if self.data: 238 message.data = self.data 239 240 if self.remaining_bytes is not None: 241 message.remaining_bytes = self.remaining_bytes 242 243 if self.max_chunk_size_bytes is not None: 244 message.max_chunk_size_bytes = self.max_chunk_size_bytes 245 246 if self.min_delay_microseconds is not None: 247 message.min_delay_microseconds = self.min_delay_microseconds 248 249 if self.status is not None: 250 message.status = self.status.value 251 252 if self._is_initial_handshake_chunk(): 253 # During the initial handshake, the desired protocol version is 254 # explictly encoded. 255 message.protocol_version = self.protocol_version.value 256 257 message.initial_offset = self.initial_offset 258 259 return message 260 261 def id(self) -> int: 262 """Returns the transfer context identifier for a chunk. 263 264 Depending on the protocol version and type of chunk, this may correspond 265 to one of several proto fields. 266 """ 267 return self.session_id 268 269 def requests_transmission_from_offset(self) -> bool: 270 """Returns True if this chunk is requesting a retransmission.""" 271 return ( 272 self.type is Chunk.Type.PARAMETERS_RETRANSMIT 273 or self.type is Chunk.Type.START 274 or self.type is Chunk.Type.START_ACK_CONFIRMATION 275 ) 276 277 def _is_initial_handshake_chunk(self) -> bool: 278 return self.protocol_version is ProtocolVersion.VERSION_TWO and ( 279 self.type is Chunk.Type.START 280 or self.type is Chunk.Type.START_ACK 281 or self.type is Chunk.Type.START_ACK_CONFIRMATION 282 ) 283 284 def _should_encode_legacy_fields(self) -> bool: 285 return ( 286 self.protocol_version is ProtocolVersion.LEGACY 287 or self.type is Chunk.Type.START 288 ) 289