1#!/usr/bin/env python3
2
3# Copyright 2022 Google LLC
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     https://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import asyncio
18import argparse
19import logging
20
21from pica import Host
22from pica.packets import uci
23from .helper import init
24
25
26async def controller(host: Host, peer: Host):
27    await init(host)
28
29    host.send_control(
30        uci.SessionInitCmd(
31            session_id=0, session_type=uci.SessionType.FIRA_RANGING_SESSION
32        )
33    )
34
35    await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK))
36
37    await host.expect_control(
38        uci.SessionStatusNtf(
39            session_token=0,
40            session_state=uci.SessionState.SESSION_STATE_INIT,
41            reason_code=0,
42        )
43    )
44
45    ranging_round_usage = 0x06
46    ranging_duration = int(1000).to_bytes(4, byteorder="little")
47
48    host.send_control(
49        uci.SessionSetAppConfigCmd(
50            session_token=0,
51            tlvs=[
52                uci.AppConfigTlv(
53                    cfg_id=uci.AppConfigTlvType.DEVICE_ROLE,
54                    v=bytes([uci.DeviceRole.INITIATOR]),
55                ),
56                uci.AppConfigTlv(
57                    cfg_id=uci.AppConfigTlvType.DEVICE_TYPE,
58                    v=bytes([uci.DeviceType.CONTROLLER]),
59                ),
60                uci.AppConfigTlv(
61                    cfg_id=uci.AppConfigTlvType.DEVICE_MAC_ADDRESS, v=host.mac_address
62                ),
63                uci.AppConfigTlv(
64                    cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE,
65                    v=bytes([uci.MacAddressMode.MODE_0]),
66                ),
67                uci.AppConfigTlv(
68                    cfg_id=uci.AppConfigTlvType.MULTI_NODE_MODE,
69                    v=bytes([uci.MultiNodeMode.ONE_TO_ONE]),
70                ),
71                uci.AppConfigTlv(
72                    cfg_id=uci.AppConfigTlvType.SCHEDULE_MODE,
73                    v=bytes([uci.ScheduleMode.CONTENTION_BASED]),
74                ),
75                uci.AppConfigTlv(
76                    cfg_id=uci.AppConfigTlvType.RANGING_ROUND_USAGE,
77                    v=bytes([ranging_round_usage]),
78                ),
79                uci.AppConfigTlv(
80                    cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=ranging_duration
81                ),
82                uci.AppConfigTlv(
83                    cfg_id=uci.AppConfigTlvType.NUMBER_OF_CONTROLEES, v=bytes([1])
84                ),
85                uci.AppConfigTlv(
86                    cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=peer.mac_address
87                ),
88            ],
89        )
90    )
91
92    await host.expect_control(
93        uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[])
94    )
95
96    await host.expect_control(
97        uci.SessionStatusNtf(
98            session_token=0,
99            session_state=uci.SessionState.SESSION_STATE_IDLE,
100            reason_code=0,
101        )
102    )
103
104    host.send_control(uci.SessionStartCmd(session_id=0))
105
106    await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK))
107
108    await host.expect_control(
109        uci.SessionStatusNtf(
110            session_token=0,
111            session_state=uci.SessionState.SESSION_STATE_ACTIVE,
112            reason_code=0,
113        )
114    )
115
116    await host.expect_control(
117        uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
118    )
119
120    for _ in range(1, 3):
121        event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
122        event.show()
123
124    host.send_control(uci.SessionStopCmd(session_id=0))
125
126    await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK))
127
128    await host.expect_control(
129        uci.SessionStatusNtf(
130            session_token=0,
131            session_state=uci.SessionState.SESSION_STATE_IDLE,
132            reason_code=0,
133        )
134    )
135
136    await host.expect_control(
137        uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
138    )
139
140    host.send_control(uci.SessionDeinitCmd(session_token=0))
141
142    await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK))
143
144
145async def controlee(host: Host, peer: Host):
146    await init(host)
147
148    host.send_control(
149        uci.SessionInitCmd(
150            session_id=0, session_type=uci.SessionType.FIRA_RANGING_SESSION
151        )
152    )
153
154    await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK))
155
156    await host.expect_control(
157        uci.SessionStatusNtf(
158            session_token=0,
159            session_state=uci.SessionState.SESSION_STATE_INIT,
160            reason_code=0,
161        )
162    )
163
164    ranging_round_usage = 0x06
165    ranging_duration = int(1000).to_bytes(4, byteorder="little")
166
167    host.send_control(
168        uci.SessionSetAppConfigCmd(
169            session_token=0,
170            tlvs=[
171                uci.AppConfigTlv(
172                    cfg_id=uci.AppConfigTlvType.DEVICE_ROLE,
173                    v=bytes([uci.DeviceRole.RESPONDER]),
174                ),
175                uci.AppConfigTlv(
176                    cfg_id=uci.AppConfigTlvType.DEVICE_TYPE,
177                    v=bytes([uci.DeviceType.CONTROLEE]),
178                ),
179                uci.AppConfigTlv(
180                    cfg_id=uci.AppConfigTlvType.DEVICE_MAC_ADDRESS, v=host.mac_address
181                ),
182                uci.AppConfigTlv(
183                    cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE,
184                    v=bytes([uci.MacAddressMode.MODE_0]),
185                ),
186                uci.AppConfigTlv(
187                    cfg_id=uci.AppConfigTlvType.MULTI_NODE_MODE,
188                    v=bytes([uci.MultiNodeMode.ONE_TO_ONE]),
189                ),
190                uci.AppConfigTlv(
191                    cfg_id=uci.AppConfigTlvType.SCHEDULE_MODE,
192                    v=bytes([uci.ScheduleMode.CONTENTION_BASED]),
193                ),
194                uci.AppConfigTlv(
195                    cfg_id=uci.AppConfigTlvType.RANGING_ROUND_USAGE,
196                    v=bytes([ranging_round_usage]),
197                ),
198                uci.AppConfigTlv(
199                    cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=ranging_duration
200                ),
201                uci.AppConfigTlv(
202                    cfg_id=uci.AppConfigTlvType.NUMBER_OF_CONTROLEES, v=bytes([1])
203                ),
204                uci.AppConfigTlv(
205                    cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=peer.mac_address
206                ),
207            ],
208        )
209    )
210
211    await host.expect_control(
212        uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[])
213    )
214
215    await host.expect_control(
216        uci.SessionStatusNtf(
217            session_token=0,
218            session_state=uci.SessionState.SESSION_STATE_IDLE,
219            reason_code=0,
220        )
221    )
222
223    host.send_control(uci.SessionStartCmd(session_id=0))
224
225    await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK))
226
227    await host.expect_control(
228        uci.SessionStatusNtf(
229            session_token=0,
230            session_state=uci.SessionState.SESSION_STATE_ACTIVE,
231            reason_code=0,
232        )
233    )
234
235    await host.expect_control(
236        uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
237    )
238
239    for _ in range(1, 3):
240        event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
241        event.show()
242
243    host.send_control(uci.SessionStopCmd(session_id=0))
244
245    await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK))
246
247    await host.expect_control(
248        uci.SessionStatusNtf(
249            session_token=0,
250            session_state=uci.SessionState.SESSION_STATE_IDLE,
251            reason_code=0,
252        )
253    )
254
255    await host.expect_control(
256        uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
257    )
258
259    host.send_control(uci.SessionDeinitCmd(session_token=0))
260
261    await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK))
262
263
264async def run(address: str, uci_port: int):
265    try:
266        host0 = await Host.connect(address, uci_port, bytes([0, 1]))
267        host1 = await Host.connect(address, uci_port, bytes([0, 2]))
268    except Exception:
269        logging.debug(
270            f"Failed to connect to Pica server at address {address}:{uci_port}\n"
271            + "Make sure the server is running"
272        )
273        exit(1)
274
275    async with asyncio.TaskGroup() as tg:
276        tg.create_task(controller(host0, host1))
277        tg.create_task(controlee(host1, host0))
278
279    host0.disconnect()
280    host1.disconnect()
281
282    logging.debug("Ranging test completed")
283
284
285def main():
286    """Start a Pica interactive console."""
287    parser = argparse.ArgumentParser(description=__doc__)
288    parser.add_argument(
289        "--address",
290        type=str,
291        default="127.0.0.1",
292        help="Select the pica server address",
293    )
294    parser.add_argument(
295        "--uci-port", type=int, default=7000, help="Select the pica TCP UCI port"
296    )
297    asyncio.run(run(**vars(parser.parse_args())))
298
299
300if __name__ == "__main__":
301    main()
302