xref: /aosp_15_r20/external/pigweed/pw_transfer/py/pw_transfer/chunk.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2023 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Protocol version-aware chunk message wrapper."""
15
16from __future__ import annotations
17
18import enum
19from typing import Any
20
21from pw_status import Status
22
23from pw_transfer import transfer_pb2
24
25
26class ProtocolVersion(enum.Enum):
27    """Supported versions of pw_transfer's RPC data transfer protocol."""
28
29    # Protocol version not known or not set.
30    UNKNOWN = 0
31
32    # The original transfer protocol, prior to transfer start/end handshakes.
33    LEGACY = 1
34
35    # Second version of the transfer protocol. Guarantees type fields on all
36    # chunks, deprecates pending_bytes in favor of window_end_offset, splits
37    # transfer resource IDs from ephemeral session IDs, and adds a handshake
38    # to the start and end of all transfer sessions.
39    VERSION_TWO = 2
40
41    # Alias to the most up-to-date version of the transfer protocol.
42    LATEST = VERSION_TWO
43
44
45_ChunkType = transfer_pb2.Chunk.Type
46
47
48class Chunk:
49    """A chunk exchanged in a pw_transfer stream.
50
51    Wraps the generated protobuf Chunk class with protocol-aware field encoding
52    and decoding.
53
54    Attributes:
55        protocol_version: Version of the transfer protocol with which the chunk
56            is encoded.
57        chunk_type: Type of the chunk within the protocol.
58        session_id: ID for the transfer session to which the chunk belongs.
59        desired_session_id: For a v2 START chunk, the client-assigned session ID
60            to request from the server.
61        resource_id: For a v2 START chunk, ID of the resource to transfer.
62        offset: Offset of the data to be transferred.
63        window_end_offset: In a parameters chunk, end offset of the available
64            window.
65        data: Raw transfer data.
66        remaining_bytes: Optional number of bytes remaining in the transfer.
67            Set to 0 when the data is fully transferred.
68        max_chunk_size_bytes: Maximum number of bytes to send in a single data
69            chunk.
70        min_delay_microseconds: Delay between data chunks to be sent.
71    """
72
73    # pylint: disable=too-many-instance-attributes
74
75    Type = transfer_pb2.Chunk.Type
76
77    # TODO(frolv): Figure out how to make the chunk type annotation work.
78    # pylint: disable=too-many-arguments
79    def __init__(
80        self,
81        protocol_version: ProtocolVersion,
82        chunk_type: Any,
83        session_id: int = 0,
84        desired_session_id: int | None = None,
85        resource_id: int | None = None,
86        offset: int = 0,
87        window_end_offset: int = 0,
88        data: bytes = b'',
89        remaining_bytes: int | None = None,
90        max_chunk_size_bytes: int | None = None,
91        min_delay_microseconds: int | None = None,
92        status: Status | None = None,
93        initial_offset: int = 0,
94    ):
95        """Creates a new transfer chunk.
96
97        Args:
98            protocol_version: Version of the transfer protocol with which to
99                encode the chunk.
100            chunk_type: Type of the chunk within the protocol.
101            session_id: ID for the transfer session to which the chunk belongs.
102            desired_session_id: For a v2 START chunk, the client-assigned
103                session ID to request from the server.
104            resource_id: For a v2 START chunk, ID of the resource to transfer.
105            offset: Offset of the data to be transferred.
106            window_end_offset: In a parameters chunk, end offset of the
107                available window.
108            data: Raw transfer data.
109            remaining_bytes: Optional number of bytes remaining in the transfer.
110                Set to 0 when the data is fully transferred.
111            max_chunk_size_bytes: Maximum number of bytes to send in a single
112                data chunk.
113            min_delay_microseconds: Delay between data chunks to be sent.
114            status: In a COMPLETION chunk, final status of the transfer.
115            initial_offset: Initial offset for non-zero starting offset
116            transfers
117        """
118        self.protocol_version = protocol_version
119        self.type = chunk_type
120        self.session_id = session_id
121        self.desired_session_id = desired_session_id
122        self.resource_id = resource_id
123        self.offset = offset
124        self.window_end_offset = window_end_offset
125        self.data = data
126        self.remaining_bytes = remaining_bytes
127        self.max_chunk_size_bytes = max_chunk_size_bytes
128        self.min_delay_microseconds = min_delay_microseconds
129        self.status = status
130        self.initial_offset = initial_offset
131
132    @classmethod
133    def from_message(cls, message: transfer_pb2.Chunk) -> Chunk:
134        """Parses a Chunk from a protobuf message."""
135
136        # Some very old versions of transfer don't always encode chunk types,
137        # so they must be deduced.
138        #
139        # The type-less legacy transfer protocol doesn't support handshakes or
140        # continuation parameters. Therefore, there are only three possible
141        # types: start, data, and retransmit.
142        if message.HasField('type'):
143            chunk_type = message.type
144        elif (
145            message.offset == 0
146            and not message.data
147            and not message.HasField('status')
148        ):
149            chunk_type = Chunk.Type.START
150        elif message.data:
151            chunk_type = Chunk.Type.DATA
152        else:
153            chunk_type = Chunk.Type.PARAMETERS_RETRANSMIT
154
155        chunk = cls(
156            ProtocolVersion.UNKNOWN,
157            chunk_type,
158            offset=message.offset,
159            window_end_offset=message.window_end_offset,
160            data=message.data,
161            initial_offset=message.initial_offset,
162        )
163
164        if message.HasField('session_id'):
165            chunk.protocol_version = ProtocolVersion.VERSION_TWO
166            chunk.session_id = message.session_id
167        else:
168            chunk.protocol_version = ProtocolVersion.LEGACY
169            chunk.session_id = message.transfer_id
170
171        if message.HasField('desired_session_id'):
172            chunk.protocol_version = ProtocolVersion.VERSION_TWO
173            chunk.desired_session_id = message.desired_session_id
174
175        if message.HasField('resource_id'):
176            chunk.resource_id = message.resource_id
177
178        if message.HasField('protocol_version'):
179            # An explicitly specified protocol version overrides any inferred
180            # one.
181            chunk.protocol_version = ProtocolVersion(message.protocol_version)
182
183        if message.HasField('pending_bytes'):
184            chunk.window_end_offset = message.offset + message.pending_bytes
185
186        if message.HasField('remaining_bytes'):
187            chunk.remaining_bytes = message.remaining_bytes
188
189        if message.HasField('max_chunk_size_bytes'):
190            chunk.max_chunk_size_bytes = message.max_chunk_size_bytes
191
192        if message.HasField('min_delay_microseconds'):
193            chunk.min_delay_microseconds = message.min_delay_microseconds
194
195        if message.HasField('status'):
196            chunk.status = Status(message.status)
197
198        if chunk.protocol_version is ProtocolVersion.UNKNOWN:
199            # If no fields in the chunk specified its protocol version,
200            # assume it is a legacy chunk.
201            chunk.protocol_version = ProtocolVersion.LEGACY
202
203        return chunk
204
205    def to_message(self) -> transfer_pb2.Chunk:
206        """Converts the chunk to a protobuf message."""
207        message = transfer_pb2.Chunk(
208            offset=self.offset,
209            window_end_offset=self.window_end_offset,
210            type=self.type,
211        )
212
213        if self.resource_id is not None:
214            message.resource_id = self.resource_id
215
216        if self.protocol_version is ProtocolVersion.VERSION_TWO:
217            if self.session_id != 0:
218                assert self.desired_session_id is None
219                message.session_id = self.session_id
220
221            if self.desired_session_id is not None:
222                message.desired_session_id = self.desired_session_id
223
224        if self._should_encode_legacy_fields():
225            if self.resource_id is not None:
226                message.transfer_id = self.resource_id
227            else:
228                assert self.session_id != 0
229                message.transfer_id = self.session_id
230
231            # In the legacy protocol, the pending_bytes field must be set
232            # alongside window_end_offset, as some transfer implementations
233            # require it.
234            if self.window_end_offset != 0:
235                message.pending_bytes = self.window_end_offset - self.offset
236
237        if self.data:
238            message.data = self.data
239
240        if self.remaining_bytes is not None:
241            message.remaining_bytes = self.remaining_bytes
242
243        if self.max_chunk_size_bytes is not None:
244            message.max_chunk_size_bytes = self.max_chunk_size_bytes
245
246        if self.min_delay_microseconds is not None:
247            message.min_delay_microseconds = self.min_delay_microseconds
248
249        if self.status is not None:
250            message.status = self.status.value
251
252        if self._is_initial_handshake_chunk():
253            # During the initial handshake, the desired protocol version is
254            # explictly encoded.
255            message.protocol_version = self.protocol_version.value
256
257        message.initial_offset = self.initial_offset
258
259        return message
260
261    def id(self) -> int:
262        """Returns the transfer context identifier for a chunk.
263
264        Depending on the protocol version and type of chunk, this may correspond
265        to one of several proto fields.
266        """
267        return self.session_id
268
269    def requests_transmission_from_offset(self) -> bool:
270        """Returns True if this chunk is requesting a retransmission."""
271        return (
272            self.type is Chunk.Type.PARAMETERS_RETRANSMIT
273            or self.type is Chunk.Type.START
274            or self.type is Chunk.Type.START_ACK_CONFIRMATION
275        )
276
277    def _is_initial_handshake_chunk(self) -> bool:
278        return self.protocol_version is ProtocolVersion.VERSION_TWO and (
279            self.type is Chunk.Type.START
280            or self.type is Chunk.Type.START_ACK
281            or self.type is Chunk.Type.START_ACK_CONFIRMATION
282        )
283
284    def _should_encode_legacy_fields(self) -> bool:
285        return (
286            self.protocol_version is ProtocolVersion.LEGACY
287            or self.type is Chunk.Type.START
288        )
289