1#!/usr/bin/env python3 2 3# Copyright 2023 Google LLC 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# https://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import asyncio 18import argparse 19import logging 20from pica import Host 21from pica.packets import uci 22from .helper import init 23from pathlib import Path 24 25MAX_DATA_PACKET_PAYLOAD_SIZE = 1024 26 27 28async def controller(host: Host, peer: Host, file: Path): 29 await init(host) 30 31 host.send_control( 32 uci.SessionInitCmd( 33 session_id=0, 34 session_type=uci.SessionType.FIRA_RANGING_AND_IN_BAND_DATA_SESSION, 35 ) 36 ) 37 38 await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK)) 39 40 await host.expect_control( 41 uci.SessionStatusNtf( 42 session_token=0, 43 session_state=uci.SessionState.SESSION_STATE_INIT, 44 reason_code=0, 45 ) 46 ) 47 48 ranging_round_usage = 0x06 49 ranging_duration = int(1000).to_bytes(4, byteorder="little") 50 51 host.send_control( 52 uci.SessionSetAppConfigCmd( 53 session_token=0, 54 tlvs=[ 55 uci.AppConfigTlv( 56 cfg_id=uci.AppConfigTlvType.DEVICE_ROLE, 57 v=bytes([uci.DeviceRole.INITIATOR]), 58 ), 59 uci.AppConfigTlv( 60 cfg_id=uci.AppConfigTlvType.DEVICE_TYPE, 61 v=bytes([uci.DeviceType.CONTROLLER]), 62 ), 63 uci.AppConfigTlv( 64 cfg_id=uci.AppConfigTlvType.DEVICE_MAC_ADDRESS, v=host.mac_address 65 ), 66 uci.AppConfigTlv( 67 cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE, 68 v=bytes([uci.MacAddressMode.MODE_0]), 69 ), 70 uci.AppConfigTlv( 71 cfg_id=uci.AppConfigTlvType.MULTI_NODE_MODE, 72 v=bytes([uci.MultiNodeMode.ONE_TO_ONE]), 73 ), 74 uci.AppConfigTlv( 75 cfg_id=uci.AppConfigTlvType.SCHEDULE_MODE, 76 v=bytes([uci.ScheduleMode.CONTENTION_BASED]), 77 ), 78 uci.AppConfigTlv( 79 cfg_id=uci.AppConfigTlvType.RANGING_ROUND_USAGE, 80 v=bytes([ranging_round_usage]), 81 ), 82 uci.AppConfigTlv( 83 cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=ranging_duration 84 ), 85 uci.AppConfigTlv( 86 cfg_id=uci.AppConfigTlvType.NUMBER_OF_CONTROLEES, v=bytes([1]) 87 ), 88 uci.AppConfigTlv( 89 cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=peer.mac_address 90 ), 91 ], 92 ) 93 ) 94 95 await host.expect_control( 96 uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[]) 97 ) 98 99 await host.expect_control( 100 uci.SessionStatusNtf( 101 session_token=0, 102 session_state=uci.SessionState.SESSION_STATE_IDLE, 103 reason_code=0, 104 ) 105 ) 106 107 await data_transfer(host, peer.mac_address, file, 0) 108 109 # START SESSION CMD 110 host.send_control(uci.SessionStartCmd(session_id=0)) 111 112 await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK)) 113 114 await host.expect_control( 115 uci.SessionStatusNtf( 116 session_token=0, 117 session_state=uci.SessionState.SESSION_STATE_ACTIVE, 118 reason_code=0, 119 ) 120 ) 121 122 await host.expect_control( 123 uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) 124 ) 125 126 event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0) 127 event.show() 128 129 event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0) 130 event.show() 131 132 # STOP SESSION 133 host.send_control(uci.SessionStopCmd(session_id=0)) 134 135 await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK)) 136 137 await host.expect_control( 138 uci.SessionStatusNtf( 139 session_token=0, 140 session_state=uci.SessionState.SESSION_STATE_IDLE, 141 reason_code=0, 142 ) 143 ) 144 145 await host.expect_control( 146 uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) 147 ) 148 149 # DEINIT 150 host.send_control(uci.SessionDeinitCmd(session_token=0)) 151 152 await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK)) 153 154 155async def controlee(host: Host, peer: Host, file: Path): 156 await init(host) 157 158 host.send_control( 159 uci.SessionInitCmd( 160 session_id=0, 161 session_type=uci.SessionType.FIRA_RANGING_AND_IN_BAND_DATA_SESSION, 162 ) 163 ) 164 165 await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK)) 166 167 await host.expect_control( 168 uci.SessionStatusNtf( 169 session_token=0, 170 session_state=uci.SessionState.SESSION_STATE_INIT, 171 reason_code=0, 172 ) 173 ) 174 175 ranging_round_usage = 0x06 176 ranging_duration = int(1000).to_bytes(4, byteorder="little") 177 178 host.send_control( 179 uci.SessionSetAppConfigCmd( 180 session_token=0, 181 tlvs=[ 182 uci.AppConfigTlv( 183 cfg_id=uci.AppConfigTlvType.DEVICE_ROLE, 184 v=bytes([uci.DeviceRole.RESPONDER]), 185 ), 186 uci.AppConfigTlv( 187 cfg_id=uci.AppConfigTlvType.DEVICE_TYPE, 188 v=bytes([uci.DeviceType.CONTROLEE]), 189 ), 190 uci.AppConfigTlv( 191 cfg_id=uci.AppConfigTlvType.DEVICE_MAC_ADDRESS, v=host.mac_address 192 ), 193 uci.AppConfigTlv( 194 cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE, 195 v=bytes([uci.MacAddressMode.MODE_0]), 196 ), 197 uci.AppConfigTlv( 198 cfg_id=uci.AppConfigTlvType.MULTI_NODE_MODE, 199 v=bytes([uci.MultiNodeMode.ONE_TO_ONE]), 200 ), 201 uci.AppConfigTlv( 202 cfg_id=uci.AppConfigTlvType.SCHEDULE_MODE, 203 v=bytes([uci.ScheduleMode.CONTENTION_BASED]), 204 ), 205 uci.AppConfigTlv( 206 cfg_id=uci.AppConfigTlvType.RANGING_ROUND_USAGE, 207 v=bytes([ranging_round_usage]), 208 ), 209 uci.AppConfigTlv( 210 cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=ranging_duration 211 ), 212 uci.AppConfigTlv( 213 cfg_id=uci.AppConfigTlvType.NUMBER_OF_CONTROLEES, v=bytes([1]) 214 ), 215 uci.AppConfigTlv( 216 cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=peer.mac_address 217 ), 218 ], 219 ) 220 ) 221 222 await host.expect_control( 223 uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[]) 224 ) 225 226 await host.expect_control( 227 uci.SessionStatusNtf( 228 session_token=0, 229 session_state=uci.SessionState.SESSION_STATE_IDLE, 230 reason_code=0, 231 ) 232 ) 233 234 host.send_control(uci.SessionStartCmd(session_id=0)) 235 236 await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK)) 237 238 await host.expect_control( 239 uci.SessionStatusNtf( 240 session_token=0, 241 session_state=uci.SessionState.SESSION_STATE_ACTIVE, 242 reason_code=0, 243 ) 244 ) 245 246 await host.expect_control( 247 uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) 248 ) 249 250 with file.open("rb") as f: 251 application_data = list(bytearray(f.read())) 252 event = await host.expect_data( 253 uci.DataMessageRcv( 254 session_handle=0, 255 status=uci.Status.OK, 256 source_address=int.from_bytes(peer.mac_address, "little"), 257 data_sequence_number=0x01, 258 application_data=application_data, 259 ), 260 timeout=2.0, 261 ) 262 event.show() 263 264 event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0) 265 event.show() 266 267 host.send_control(uci.SessionStopCmd(session_id=0)) 268 269 await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK)) 270 271 await host.expect_control( 272 uci.SessionStatusNtf( 273 session_token=0, 274 session_state=uci.SessionState.SESSION_STATE_IDLE, 275 reason_code=0, 276 ) 277 ) 278 279 await host.expect_control( 280 uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) 281 ) 282 283 host.send_control(uci.SessionDeinitCmd(session_token=0)) 284 285 await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK)) 286 287 288async def data_transfer( 289 host: Host, dst_mac_address: bytes, file: Path, session_id: int 290): 291 try: 292 with open(file, "rb") as f: 293 b = f.read() 294 seq_num = 0 295 296 if len(b) > MAX_DATA_PACKET_PAYLOAD_SIZE: 297 for i in range(0, len(b), MAX_DATA_PACKET_PAYLOAD_SIZE): 298 chunk = b[i : i + MAX_DATA_PACKET_PAYLOAD_SIZE] 299 300 if i + MAX_DATA_PACKET_PAYLOAD_SIZE >= len(b): 301 host.send_data( 302 uci.DataMessageSnd( 303 session_handle=int(session_id), 304 destination_address=int.from_bytes(dst_mac_address), 305 data_sequence_number=seq_num, 306 application_data=chunk, 307 ) 308 ) 309 else: 310 host.send_data( 311 uci.DataMessageSnd( 312 session_handle=int(session_id), 313 pbf=uci.PacketBoundaryFlag.NOT_COMPLETE, 314 destination_address=int.from_bytes(dst_mac_address), 315 data_sequence_number=seq_num, 316 application_data=chunk, 317 ) 318 ) 319 320 seq_num += 1 321 if seq_num >= 65535: 322 seq_num = 0 323 324 event = await host.expect_control( 325 uci.SessionDataCreditNtf( 326 session_token=int(session_id), 327 credit_availability=uci.CreditAvailability.CREDIT_AVAILABLE, 328 ) 329 ) 330 event.show() 331 else: 332 host.send_data( 333 uci.DataMessageSnd( 334 session_handle=int(session_id), 335 destination_address=int.from_bytes(dst_mac_address), 336 data_sequence_number=seq_num, 337 application_data=b, 338 ) 339 ) 340 event = await host.expect_control( 341 uci.SessionDataCreditNtf( 342 session_token=int(session_id), 343 credit_availability=uci.CreditAvailability.CREDIT_AVAILABLE, 344 ) 345 ) 346 event.show() 347 348 except Exception as e: 349 print(e) 350 351 352async def run(address: str, uci_port: int, file: Path): 353 try: 354 host0 = await Host.connect(address, uci_port, bytes([0x34, 0x12])) 355 host1 = await Host.connect(address, uci_port, bytes([0x78, 0x56])) 356 except Exception as e: 357 raise Exception( 358 f"Failed to connect to Pica server at address {address}:{uci_port}\n" 359 + "Make sure the server is running" 360 ) 361 362 try: 363 async with asyncio.TaskGroup() as tg: 364 tg.create_task(controller(host0, host1, file)) 365 tg.create_task(controlee(host1, host0, file)) 366 except Exception as e: 367 raise e 368 finally: 369 host0.disconnect() 370 host1.disconnect() 371 logging.debug("Data transfer test completed") 372 373 374def main(): 375 """Start a Pica interactive console.""" 376 parser = argparse.ArgumentParser(description=__doc__) 377 parser.add_argument( 378 "--address", 379 type=str, 380 default="127.0.0.1", 381 help="Select the pica server address", 382 ) 383 parser.add_argument( 384 "--uci-port", type=int, default=7000, help="Select the pica TCP UCI port" 385 ) 386 parser.add_argument( 387 "--file", type=Path, required=True, help="Select the file to transfer" 388 ) 389 asyncio.run(run(**vars(parser.parse_args()))) 390 391 392if __name__ == "__main__": 393 main() 394