1#!/usr/bin/env python3
2
3# Copyright 2023 Google LLC
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     https://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import asyncio
18import argparse
19import logging
20from pica import Host
21from pica.packets import uci
22from .helper import init
23from pathlib import Path
24
25MAX_DATA_PACKET_PAYLOAD_SIZE = 1024
26
27
28async def controller(host: Host, peer: Host, file: Path):
29    await init(host)
30
31    host.send_control(
32        uci.SessionInitCmd(
33            session_id=0,
34            session_type=uci.SessionType.FIRA_RANGING_AND_IN_BAND_DATA_SESSION,
35        )
36    )
37
38    await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK))
39
40    await host.expect_control(
41        uci.SessionStatusNtf(
42            session_token=0,
43            session_state=uci.SessionState.SESSION_STATE_INIT,
44            reason_code=0,
45        )
46    )
47
48    ranging_round_usage = 0x06
49    ranging_duration = int(1000).to_bytes(4, byteorder="little")
50
51    host.send_control(
52        uci.SessionSetAppConfigCmd(
53            session_token=0,
54            tlvs=[
55                uci.AppConfigTlv(
56                    cfg_id=uci.AppConfigTlvType.DEVICE_ROLE,
57                    v=bytes([uci.DeviceRole.INITIATOR]),
58                ),
59                uci.AppConfigTlv(
60                    cfg_id=uci.AppConfigTlvType.DEVICE_TYPE,
61                    v=bytes([uci.DeviceType.CONTROLLER]),
62                ),
63                uci.AppConfigTlv(
64                    cfg_id=uci.AppConfigTlvType.DEVICE_MAC_ADDRESS, v=host.mac_address
65                ),
66                uci.AppConfigTlv(
67                    cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE,
68                    v=bytes([uci.MacAddressMode.MODE_0]),
69                ),
70                uci.AppConfigTlv(
71                    cfg_id=uci.AppConfigTlvType.MULTI_NODE_MODE,
72                    v=bytes([uci.MultiNodeMode.ONE_TO_ONE]),
73                ),
74                uci.AppConfigTlv(
75                    cfg_id=uci.AppConfigTlvType.SCHEDULE_MODE,
76                    v=bytes([uci.ScheduleMode.CONTENTION_BASED]),
77                ),
78                uci.AppConfigTlv(
79                    cfg_id=uci.AppConfigTlvType.RANGING_ROUND_USAGE,
80                    v=bytes([ranging_round_usage]),
81                ),
82                uci.AppConfigTlv(
83                    cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=ranging_duration
84                ),
85                uci.AppConfigTlv(
86                    cfg_id=uci.AppConfigTlvType.NUMBER_OF_CONTROLEES, v=bytes([1])
87                ),
88                uci.AppConfigTlv(
89                    cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=peer.mac_address
90                ),
91            ],
92        )
93    )
94
95    await host.expect_control(
96        uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[])
97    )
98
99    await host.expect_control(
100        uci.SessionStatusNtf(
101            session_token=0,
102            session_state=uci.SessionState.SESSION_STATE_IDLE,
103            reason_code=0,
104        )
105    )
106
107    await data_transfer(host, peer.mac_address, file, 0)
108
109    # START SESSION CMD
110    host.send_control(uci.SessionStartCmd(session_id=0))
111
112    await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK))
113
114    await host.expect_control(
115        uci.SessionStatusNtf(
116            session_token=0,
117            session_state=uci.SessionState.SESSION_STATE_ACTIVE,
118            reason_code=0,
119        )
120    )
121
122    await host.expect_control(
123        uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
124    )
125
126    event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
127    event.show()
128
129    event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
130    event.show()
131
132    # STOP SESSION
133    host.send_control(uci.SessionStopCmd(session_id=0))
134
135    await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK))
136
137    await host.expect_control(
138        uci.SessionStatusNtf(
139            session_token=0,
140            session_state=uci.SessionState.SESSION_STATE_IDLE,
141            reason_code=0,
142        )
143    )
144
145    await host.expect_control(
146        uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
147    )
148
149    # DEINIT
150    host.send_control(uci.SessionDeinitCmd(session_token=0))
151
152    await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK))
153
154
155async def controlee(host: Host, peer: Host, file: Path):
156    await init(host)
157
158    host.send_control(
159        uci.SessionInitCmd(
160            session_id=0,
161            session_type=uci.SessionType.FIRA_RANGING_AND_IN_BAND_DATA_SESSION,
162        )
163    )
164
165    await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK))
166
167    await host.expect_control(
168        uci.SessionStatusNtf(
169            session_token=0,
170            session_state=uci.SessionState.SESSION_STATE_INIT,
171            reason_code=0,
172        )
173    )
174
175    ranging_round_usage = 0x06
176    ranging_duration = int(1000).to_bytes(4, byteorder="little")
177
178    host.send_control(
179        uci.SessionSetAppConfigCmd(
180            session_token=0,
181            tlvs=[
182                uci.AppConfigTlv(
183                    cfg_id=uci.AppConfigTlvType.DEVICE_ROLE,
184                    v=bytes([uci.DeviceRole.RESPONDER]),
185                ),
186                uci.AppConfigTlv(
187                    cfg_id=uci.AppConfigTlvType.DEVICE_TYPE,
188                    v=bytes([uci.DeviceType.CONTROLEE]),
189                ),
190                uci.AppConfigTlv(
191                    cfg_id=uci.AppConfigTlvType.DEVICE_MAC_ADDRESS, v=host.mac_address
192                ),
193                uci.AppConfigTlv(
194                    cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE,
195                    v=bytes([uci.MacAddressMode.MODE_0]),
196                ),
197                uci.AppConfigTlv(
198                    cfg_id=uci.AppConfigTlvType.MULTI_NODE_MODE,
199                    v=bytes([uci.MultiNodeMode.ONE_TO_ONE]),
200                ),
201                uci.AppConfigTlv(
202                    cfg_id=uci.AppConfigTlvType.SCHEDULE_MODE,
203                    v=bytes([uci.ScheduleMode.CONTENTION_BASED]),
204                ),
205                uci.AppConfigTlv(
206                    cfg_id=uci.AppConfigTlvType.RANGING_ROUND_USAGE,
207                    v=bytes([ranging_round_usage]),
208                ),
209                uci.AppConfigTlv(
210                    cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=ranging_duration
211                ),
212                uci.AppConfigTlv(
213                    cfg_id=uci.AppConfigTlvType.NUMBER_OF_CONTROLEES, v=bytes([1])
214                ),
215                uci.AppConfigTlv(
216                    cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=peer.mac_address
217                ),
218            ],
219        )
220    )
221
222    await host.expect_control(
223        uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[])
224    )
225
226    await host.expect_control(
227        uci.SessionStatusNtf(
228            session_token=0,
229            session_state=uci.SessionState.SESSION_STATE_IDLE,
230            reason_code=0,
231        )
232    )
233
234    host.send_control(uci.SessionStartCmd(session_id=0))
235
236    await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK))
237
238    await host.expect_control(
239        uci.SessionStatusNtf(
240            session_token=0,
241            session_state=uci.SessionState.SESSION_STATE_ACTIVE,
242            reason_code=0,
243        )
244    )
245
246    await host.expect_control(
247        uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
248    )
249
250    with file.open("rb") as f:
251        application_data = list(bytearray(f.read()))
252        event = await host.expect_data(
253            uci.DataMessageRcv(
254                session_handle=0,
255                status=uci.Status.OK,
256                source_address=int.from_bytes(peer.mac_address, "little"),
257                data_sequence_number=0x01,
258                application_data=application_data,
259            ),
260            timeout=2.0,
261        )
262        event.show()
263
264    event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
265    event.show()
266
267    host.send_control(uci.SessionStopCmd(session_id=0))
268
269    await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK))
270
271    await host.expect_control(
272        uci.SessionStatusNtf(
273            session_token=0,
274            session_state=uci.SessionState.SESSION_STATE_IDLE,
275            reason_code=0,
276        )
277    )
278
279    await host.expect_control(
280        uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
281    )
282
283    host.send_control(uci.SessionDeinitCmd(session_token=0))
284
285    await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK))
286
287
288async def data_transfer(
289    host: Host, dst_mac_address: bytes, file: Path, session_id: int
290):
291    try:
292        with open(file, "rb") as f:
293            b = f.read()
294            seq_num = 0
295
296            if len(b) > MAX_DATA_PACKET_PAYLOAD_SIZE:
297                for i in range(0, len(b), MAX_DATA_PACKET_PAYLOAD_SIZE):
298                    chunk = b[i : i + MAX_DATA_PACKET_PAYLOAD_SIZE]
299
300                    if i + MAX_DATA_PACKET_PAYLOAD_SIZE >= len(b):
301                        host.send_data(
302                            uci.DataMessageSnd(
303                                session_handle=int(session_id),
304                                destination_address=int.from_bytes(dst_mac_address),
305                                data_sequence_number=seq_num,
306                                application_data=chunk,
307                            )
308                        )
309                    else:
310                        host.send_data(
311                            uci.DataMessageSnd(
312                                session_handle=int(session_id),
313                                pbf=uci.PacketBoundaryFlag.NOT_COMPLETE,
314                                destination_address=int.from_bytes(dst_mac_address),
315                                data_sequence_number=seq_num,
316                                application_data=chunk,
317                            )
318                        )
319
320                    seq_num += 1
321                    if seq_num >= 65535:
322                        seq_num = 0
323
324                    event = await host.expect_control(
325                        uci.SessionDataCreditNtf(
326                            session_token=int(session_id),
327                            credit_availability=uci.CreditAvailability.CREDIT_AVAILABLE,
328                        )
329                    )
330                    event.show()
331            else:
332                host.send_data(
333                    uci.DataMessageSnd(
334                        session_handle=int(session_id),
335                        destination_address=int.from_bytes(dst_mac_address),
336                        data_sequence_number=seq_num,
337                        application_data=b,
338                    )
339                )
340                event = await host.expect_control(
341                    uci.SessionDataCreditNtf(
342                        session_token=int(session_id),
343                        credit_availability=uci.CreditAvailability.CREDIT_AVAILABLE,
344                    )
345                )
346                event.show()
347
348    except Exception as e:
349        print(e)
350
351
352async def run(address: str, uci_port: int, file: Path):
353    try:
354        host0 = await Host.connect(address, uci_port, bytes([0x34, 0x12]))
355        host1 = await Host.connect(address, uci_port, bytes([0x78, 0x56]))
356    except Exception as e:
357        raise Exception(
358            f"Failed to connect to Pica server at address {address}:{uci_port}\n"
359            + "Make sure the server is running"
360        )
361
362    try:
363        async with asyncio.TaskGroup() as tg:
364            tg.create_task(controller(host0, host1, file))
365            tg.create_task(controlee(host1, host0, file))
366    except Exception as e:
367        raise e
368    finally:
369        host0.disconnect()
370        host1.disconnect()
371        logging.debug("Data transfer test completed")
372
373
374def main():
375    """Start a Pica interactive console."""
376    parser = argparse.ArgumentParser(description=__doc__)
377    parser.add_argument(
378        "--address",
379        type=str,
380        default="127.0.0.1",
381        help="Select the pica server address",
382    )
383    parser.add_argument(
384        "--uci-port", type=int, default=7000, help="Select the pica TCP UCI port"
385    )
386    parser.add_argument(
387        "--file", type=Path, required=True, help="Select the file to transfer"
388    )
389    asyncio.run(run(**vars(parser.parse_args())))
390
391
392if __name__ == "__main__":
393    main()
394