1#!/usr/bin/env python3 2 3# Copyright 2022 Google LLC 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# https://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import asyncio 18import argparse 19import logging 20 21from pica import Host 22from pica.packets import uci 23from .helper import init 24 25 26async def controller(host: Host, peer: Host): 27 await init(host) 28 29 host.send_control( 30 uci.SessionInitCmd( 31 session_id=0, session_type=uci.SessionType.FIRA_RANGING_SESSION 32 ) 33 ) 34 35 await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK)) 36 37 await host.expect_control( 38 uci.SessionStatusNtf( 39 session_token=0, 40 session_state=uci.SessionState.SESSION_STATE_INIT, 41 reason_code=0, 42 ) 43 ) 44 45 ranging_round_usage = 0x06 46 ranging_duration = int(1000).to_bytes(4, byteorder="little") 47 48 host.send_control( 49 uci.SessionSetAppConfigCmd( 50 session_token=0, 51 tlvs=[ 52 uci.AppConfigTlv( 53 cfg_id=uci.AppConfigTlvType.DEVICE_ROLE, 54 v=bytes([uci.DeviceRole.INITIATOR]), 55 ), 56 uci.AppConfigTlv( 57 cfg_id=uci.AppConfigTlvType.DEVICE_TYPE, 58 v=bytes([uci.DeviceType.CONTROLLER]), 59 ), 60 uci.AppConfigTlv( 61 cfg_id=uci.AppConfigTlvType.DEVICE_MAC_ADDRESS, v=host.mac_address 62 ), 63 uci.AppConfigTlv( 64 cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE, 65 v=bytes([uci.MacAddressMode.MODE_0]), 66 ), 67 uci.AppConfigTlv( 68 cfg_id=uci.AppConfigTlvType.MULTI_NODE_MODE, 69 v=bytes([uci.MultiNodeMode.ONE_TO_ONE]), 70 ), 71 uci.AppConfigTlv( 72 cfg_id=uci.AppConfigTlvType.SCHEDULE_MODE, 73 v=bytes([uci.ScheduleMode.CONTENTION_BASED]), 74 ), 75 uci.AppConfigTlv( 76 cfg_id=uci.AppConfigTlvType.RANGING_ROUND_USAGE, 77 v=bytes([ranging_round_usage]), 78 ), 79 uci.AppConfigTlv( 80 cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=ranging_duration 81 ), 82 uci.AppConfigTlv( 83 cfg_id=uci.AppConfigTlvType.NUMBER_OF_CONTROLEES, v=bytes([1]) 84 ), 85 uci.AppConfigTlv( 86 cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=peer.mac_address 87 ), 88 ], 89 ) 90 ) 91 92 await host.expect_control( 93 uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[]) 94 ) 95 96 await host.expect_control( 97 uci.SessionStatusNtf( 98 session_token=0, 99 session_state=uci.SessionState.SESSION_STATE_IDLE, 100 reason_code=0, 101 ) 102 ) 103 104 host.send_control(uci.SessionStartCmd(session_id=0)) 105 106 await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK)) 107 108 await host.expect_control( 109 uci.SessionStatusNtf( 110 session_token=0, 111 session_state=uci.SessionState.SESSION_STATE_ACTIVE, 112 reason_code=0, 113 ) 114 ) 115 116 await host.expect_control( 117 uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) 118 ) 119 120 for _ in range(1, 3): 121 event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0) 122 event.show() 123 124 host.send_control(uci.SessionStopCmd(session_id=0)) 125 126 await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK)) 127 128 await host.expect_control( 129 uci.SessionStatusNtf( 130 session_token=0, 131 session_state=uci.SessionState.SESSION_STATE_IDLE, 132 reason_code=0, 133 ) 134 ) 135 136 await host.expect_control( 137 uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) 138 ) 139 140 host.send_control(uci.SessionDeinitCmd(session_token=0)) 141 142 await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK)) 143 144 145async def controlee(host: Host, peer: Host): 146 await init(host) 147 148 host.send_control( 149 uci.SessionInitCmd( 150 session_id=0, session_type=uci.SessionType.FIRA_RANGING_SESSION 151 ) 152 ) 153 154 await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK)) 155 156 await host.expect_control( 157 uci.SessionStatusNtf( 158 session_token=0, 159 session_state=uci.SessionState.SESSION_STATE_INIT, 160 reason_code=0, 161 ) 162 ) 163 164 ranging_round_usage = 0x06 165 ranging_duration = int(1000).to_bytes(4, byteorder="little") 166 167 host.send_control( 168 uci.SessionSetAppConfigCmd( 169 session_token=0, 170 tlvs=[ 171 uci.AppConfigTlv( 172 cfg_id=uci.AppConfigTlvType.DEVICE_ROLE, 173 v=bytes([uci.DeviceRole.RESPONDER]), 174 ), 175 uci.AppConfigTlv( 176 cfg_id=uci.AppConfigTlvType.DEVICE_TYPE, 177 v=bytes([uci.DeviceType.CONTROLEE]), 178 ), 179 uci.AppConfigTlv( 180 cfg_id=uci.AppConfigTlvType.DEVICE_MAC_ADDRESS, v=host.mac_address 181 ), 182 uci.AppConfigTlv( 183 cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE, 184 v=bytes([uci.MacAddressMode.MODE_0]), 185 ), 186 uci.AppConfigTlv( 187 cfg_id=uci.AppConfigTlvType.MULTI_NODE_MODE, 188 v=bytes([uci.MultiNodeMode.ONE_TO_ONE]), 189 ), 190 uci.AppConfigTlv( 191 cfg_id=uci.AppConfigTlvType.SCHEDULE_MODE, 192 v=bytes([uci.ScheduleMode.CONTENTION_BASED]), 193 ), 194 uci.AppConfigTlv( 195 cfg_id=uci.AppConfigTlvType.RANGING_ROUND_USAGE, 196 v=bytes([ranging_round_usage]), 197 ), 198 uci.AppConfigTlv( 199 cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=ranging_duration 200 ), 201 uci.AppConfigTlv( 202 cfg_id=uci.AppConfigTlvType.NUMBER_OF_CONTROLEES, v=bytes([1]) 203 ), 204 uci.AppConfigTlv( 205 cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=peer.mac_address 206 ), 207 ], 208 ) 209 ) 210 211 await host.expect_control( 212 uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[]) 213 ) 214 215 await host.expect_control( 216 uci.SessionStatusNtf( 217 session_token=0, 218 session_state=uci.SessionState.SESSION_STATE_IDLE, 219 reason_code=0, 220 ) 221 ) 222 223 host.send_control(uci.SessionStartCmd(session_id=0)) 224 225 await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK)) 226 227 await host.expect_control( 228 uci.SessionStatusNtf( 229 session_token=0, 230 session_state=uci.SessionState.SESSION_STATE_ACTIVE, 231 reason_code=0, 232 ) 233 ) 234 235 await host.expect_control( 236 uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) 237 ) 238 239 for _ in range(1, 3): 240 event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0) 241 event.show() 242 243 host.send_control(uci.SessionStopCmd(session_id=0)) 244 245 await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK)) 246 247 await host.expect_control( 248 uci.SessionStatusNtf( 249 session_token=0, 250 session_state=uci.SessionState.SESSION_STATE_IDLE, 251 reason_code=0, 252 ) 253 ) 254 255 await host.expect_control( 256 uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) 257 ) 258 259 host.send_control(uci.SessionDeinitCmd(session_token=0)) 260 261 await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK)) 262 263 264async def run(address: str, uci_port: int): 265 try: 266 host0 = await Host.connect(address, uci_port, bytes([0, 1])) 267 host1 = await Host.connect(address, uci_port, bytes([0, 2])) 268 except Exception: 269 logging.debug( 270 f"Failed to connect to Pica server at address {address}:{uci_port}\n" 271 + "Make sure the server is running" 272 ) 273 exit(1) 274 275 async with asyncio.TaskGroup() as tg: 276 tg.create_task(controller(host0, host1)) 277 tg.create_task(controlee(host1, host0)) 278 279 host0.disconnect() 280 host1.disconnect() 281 282 logging.debug("Ranging test completed") 283 284 285def main(): 286 """Start a Pica interactive console.""" 287 parser = argparse.ArgumentParser(description=__doc__) 288 parser.add_argument( 289 "--address", 290 type=str, 291 default="127.0.0.1", 292 help="Select the pica server address", 293 ) 294 parser.add_argument( 295 "--uci-port", type=int, default=7000, help="Select the pica TCP UCI port" 296 ) 297 asyncio.run(run(**vars(parser.parse_args()))) 298 299 300if __name__ == "__main__": 301 main() 302