1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Lint as: python3 16 17from collections.abc import Sequence 18import dataclasses 19import datetime 20import logging 21 22from mobly import asserts 23from mobly import base_test 24from mobly import records 25from mobly import test_runner 26from mobly import utils 27from mobly.controllers import android_device 28from mobly.controllers.android_device_lib import callback_handler_v2 29from mobly.snippet import errors 30import wifi_test_utils 31 32from direct import constants 33 34_DEFAULT_TIMEOUT = datetime.timedelta(seconds=30) 35 36 37@dataclasses.dataclass(frozen=True) 38class DeviceState: 39 """All objects related to operating p2p snippet RPCs. 40 41 Attributes: 42 ad: The Android device controller object. 43 p2p_device: The object that represents a Wi-Fi p2p device. 44 """ 45 46 ad: android_device.AndroidDevice 47 p2p_device: constants.WifiP2pDevice 48 broadcast_receiver: callback_handler_v2.CallbackHandlerV2 49 50 51def _setup_wifi_p2p(ad: android_device.AndroidDevice) -> DeviceState: 52 """Sets up Wi-Fi p2p for automation tests on an Android device.""" 53 broadcast_receiver = _init_wifi_p2p(ad) 54 _delete_all_persistent_groups(ad) 55 p2p_device = _get_p2p_device(ad) 56 asserts.assert_not_equal( 57 p2p_device.device_address, 58 constants.ANONYMIZED_MAC_ADDRESS, 59 'Failed to get p2p device MAC address, please check permissions ' 60 'required by API WifiP2pManager#requestConnectionInfo', 61 ) 62 return DeviceState( 63 ad=ad, p2p_device=p2p_device, broadcast_receiver=broadcast_receiver 64 ) 65 66 67def _init_wifi_p2p( 68 ad: android_device.AndroidDevice, 69) -> callback_handler_v2.CallbackHandlerV2: 70 """Registers the snippet app with the Wi-Fi p2p framework. 71 72 This must be the first to be called before any p2p operations are performed. 73 74 Args: 75 ad: The Android device controller object. 76 77 Returns: 78 The broadcast receiver from which you can get snippet events 79 corresponding to Wi-Fi p2p intents received on device. 80 """ 81 broadcast_receiver = ad.wifi.wifiP2pInitialize() 82 init_event = broadcast_receiver.waitAndGet( 83 event_name=constants.WIFI_P2P_STATE_CHANGED_ACTION, 84 timeout=_DEFAULT_TIMEOUT.total_seconds(), 85 ) 86 state = constants.ExtraWifiState( 87 init_event.data[constants.EXTRA_WIFI_STATE] 88 ) 89 asserts.assert_equal( 90 state, 91 constants.ExtraWifiState.WIFI_P2P_STATE_ENABLED, 92 f'Failed to initialize Wi-Fi P2P, state: {state}', 93 ) 94 return broadcast_receiver 95 96 97def _capture_p2p_intents( 98 ad: android_device.AndroidDevice, 99) -> callback_handler_v2.CallbackHandlerV2: 100 """Starts capturing Wi-Fi p2p intents and returns the intent receiver.""" 101 broadcast_receiver = ad.wifi.wifiP2pCaptureP2pIntents() 102 return broadcast_receiver 103 104 105def _delete_all_persistent_groups( 106 ad: android_device.AndroidDevice, 107) -> None: 108 """Deletes all persistent Wi-Fi p2p groups.""" 109 groups = _request_persistent_group_info(ad) 110 ad.log.debug('Wi-Fi p2p persistent groups before delete: %s', groups) 111 for group in groups: 112 result_data = ad.wifi.wifiP2pDeletePersistentGroup(group.network_id) 113 result = result_data[constants.EVENT_KEY_CALLBACK_NAME] 114 if result != constants.ACTION_LISTENER_ON_SUCCESS: 115 reason = constants.ActionListenerOnFailure( 116 result_data[constants.EVENT_KEY_REASON] 117 ) 118 raise RuntimeError( 119 'Failed to delete persistent group with network id ' 120 f'{group.network_id}. Reason: {reason.name}' 121 ) 122 groups = _request_persistent_group_info(ad) 123 ad.log.debug('Wi-Fi p2p persistent groups after delete: %s', groups) 124 125 126def _request_persistent_group_info( 127 ad: android_device.AndroidDevice, 128) -> Sequence[constants.WifiP2pGroup]: 129 """Requests persistent group information.""" 130 callback_handler = ad.wifi.wifiP2pRequestPersistentGroupInfo() 131 event = callback_handler.waitAndGet( 132 event_name=constants.ON_PERSISTENT_GROUP_INFO_AVAILABLE, 133 timeout=_DEFAULT_TIMEOUT.total_seconds(), 134 ) 135 groups = constants.WifiP2pGroup.from_dict_list(event.data['groupList']) 136 return groups 137 138 139def _get_p2p_device( 140 ad: android_device.AndroidDevice, 141) -> constants.WifiP2pDevice: 142 """Gets the Wi-Fi p2p device information.""" 143 callback_handler = ad.wifi.wifiP2pRequestDeviceInfo() 144 event = callback_handler.waitAndGet( 145 event_name=constants.ON_DEVICE_INFO_AVAILABLE, 146 timeout=_DEFAULT_TIMEOUT.total_seconds(), 147 ) 148 return constants.WifiP2pDevice.from_dict( 149 event.data[constants.EVENT_KEY_P2P_DEVICE] 150 ) 151 152 153def _find_p2p_device( 154 requester: DeviceState, 155 responder: DeviceState, 156) -> constants.WifiP2pDevice: 157 """Initiates Wi-Fi p2p discovery for the requester to find the responder. 158 159 This initiates Wi-Fi p2p discovery on both devices and checks that the 160 requester can discover responder and return peer p2p device. 161 """ 162 requester.ad.log.debug('Discovering Wi-Fi P2P peers.') 163 responder.ad.wifi.wifiP2pDiscoverPeers() 164 165 _clear_events(requester, constants.WIFI_P2P_PEERS_CHANGED_ACTION) 166 requester.ad.wifi.wifiP2pDiscoverPeers() 167 168 event = requester.broadcast_receiver.waitAndGet( 169 event_name=constants.WIFI_P2P_PEERS_CHANGED_ACTION, 170 timeout=_DEFAULT_TIMEOUT.total_seconds(), 171 ) 172 requester_peers = constants.WifiP2pDevice.from_dict_list( 173 event.data[constants.EVENT_KEY_PEER_LIST] 174 ) 175 176 responder_mac = responder.p2p_device.device_address 177 filtered_peers = [ 178 peer for peer in requester_peers if peer.device_address == responder_mac 179 ] 180 if len(filtered_peers) == 0: 181 asserts.fail( 182 f'{requester.ad} did not find the responder device. Responder MAC ' 183 f'address: {responder_mac}, found peers: {requester_peers}.' 184 ) 185 if len(filtered_peers) > 1: 186 asserts.fail( 187 f'{requester.ad} found more than one responder device. Responder ' 188 f'MAC address: {responder_mac}, found peers: {requester_peers}.' 189 ) 190 return filtered_peers[0] 191 192 193def _p2p_connect_with_push_button( 194 requester: DeviceState, 195 responder: DeviceState, 196) -> constants.WifiP2pDevice: 197 """Establishes Wi-Fi p2p connection with WPS push button configuration. 198 199 This initiates p2p connection on requester, accepts invitation on responder, 200 and checks connection status on both devices. 201 202 Args: 203 requester: The requester device. 204 responder: The respodner device. 205 206 Returns: 207 The peer p2p device found on the requester. 208 """ 209 logging.info('Establishing a p2p connection through WPS PBC.') 210 211 # Clear events in broadcast receiver. 212 _clear_events(requester, constants.WIFI_P2P_PEERS_CHANGED_ACTION) 213 _clear_events(requester, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION) 214 _clear_events(responder, constants.WIFI_P2P_PEERS_CHANGED_ACTION) 215 _clear_events(responder, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION) 216 217 # Send P2P connect invitation from requester. 218 config = constants.WifiP2pConfig( 219 device_address=responder.p2p_device.device_address, 220 wps_setup=constants.WpsInfo.PBC, 221 ) 222 requester.ad.wifi.wifiP2pConnect(config.to_dict()) 223 requester.ad.log.info( 224 'Successfully sent P2P connect invitation to responder.' 225 ) 226 227 # Click accept button on responder. 228 responder.ad.wifi.wifiP2pAcceptInvitation(requester.p2p_device.device_name) 229 responder.ad.log.info('Accepted connect invitation.') 230 231 # Check p2p status on requester. 232 _wait_connection_notice(requester.broadcast_receiver) 233 _wait_peer_connected( 234 requester.broadcast_receiver, 235 responder.p2p_device.device_address, 236 ) 237 requester.ad.log.debug( 238 'Connected with device %s through wifi p2p.', 239 responder.p2p_device.device_address, 240 ) 241 242 # Check p2p status on responder. 243 _wait_connection_notice(responder.broadcast_receiver) 244 _wait_peer_connected( 245 responder.broadcast_receiver, 246 requester.p2p_device.device_address, 247 ) 248 responder.ad.log.debug( 249 'Connected with device %s through wifi p2p.', 250 requester.p2p_device.device_address, 251 ) 252 253 logging.info('Established wifi p2p connection.') 254 255 256def _wait_peer_connected( 257 broadcast_receiver: callback_handler_v2.CallbackHandlerV2, peer_address: str 258): 259 """Waits for event that indicates expected Wi-Fi p2p peer is connected.""" 260 261 def _is_peer_connected(event): 262 devices = constants.WifiP2pDevice.from_dict_list(event.data['peerList']) 263 for device in devices: 264 if ( 265 device.device_address == peer_address 266 and device.status == constants.WifiP2pDeviceStatus.CONNECTED 267 ): 268 return True 269 return False 270 271 broadcast_receiver.waitForEvent( 272 event_name=constants.WIFI_P2P_PEERS_CHANGED_ACTION, 273 predicate=_is_peer_connected, 274 timeout=_DEFAULT_TIMEOUT.total_seconds(), 275 ) 276 277 278def _wait_connection_notice( 279 broadcast_receiver: callback_handler_v2.CallbackHandlerV2, 280): 281 """Waits for event that indicates a p2p connection is established.""" 282 283 def _is_group_formed(event): 284 try: 285 p2p_info = constants.WifiP2pInfo.from_dict( 286 event.data[constants.EVENT_KEY_P2P_INFO] 287 ) 288 return p2p_info.group_formed 289 except KeyError: 290 return False 291 292 event = broadcast_receiver.waitForEvent( 293 event_name=constants.WIFI_P2P_CONNECTION_CHANGED_ACTION, 294 predicate=_is_group_formed, 295 timeout=_DEFAULT_TIMEOUT.total_seconds(), 296 ) 297 298 299def _remove_group_and_verify_disconnected( 300 requester: DeviceState, 301 responder: DeviceState, 302): 303 """Stops p2p connection and verifies disconnection status on devices.""" 304 logging.info('Stopping wifi p2p connection.') 305 306 # Clear events in broadcast receiver. 307 _clear_events(requester, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION) 308 _clear_events(requester, constants.ON_DEVICE_INFO_AVAILABLE) 309 _clear_events(responder, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION) 310 _clear_events(responder, constants.ON_DEVICE_INFO_AVAILABLE) 311 312 # Requester initiates p2p group removal. 313 requester.ad.wifi.wifiP2pRemoveGroup() 314 315 # Check p2p status on requester. 316 _wait_disconnection_notice(requester.broadcast_receiver) 317 _wait_peer_disconnected( 318 requester.broadcast_receiver, responder.p2p_device.device_address 319 ) 320 requester.ad.log.debug( 321 'Disconnected with device %s through wifi p2p.', 322 responder.p2p_device.device_address, 323 ) 324 325 # Check p2p status on responder. 326 _wait_disconnection_notice(responder.broadcast_receiver) 327 _wait_peer_disconnected( 328 responder.broadcast_receiver, requester.p2p_device.device_address 329 ) 330 responder.ad.log.debug( 331 'Disconnected with device %s through wifi p2p.', 332 requester.p2p_device.device_address, 333 ) 334 335 logging.info('Stopped wifi p2p connection.') 336 337 338def _wait_disconnection_notice(broadcast_receiver): 339 """Waits for event that indicates the p2p connection is disconnected.""" 340 341 def _is_disconnect_event(event): 342 info = constants.WifiP2pInfo.from_dict( 343 event.data[constants.EVENT_KEY_P2P_INFO] 344 ) 345 return not info.group_formed 346 347 broadcast_receiver.waitForEvent( 348 event_name=constants.WIFI_P2P_CONNECTION_CHANGED_ACTION, 349 predicate=_is_disconnect_event, 350 timeout=_DEFAULT_TIMEOUT.total_seconds(), 351 ) 352 353 354def _wait_peer_disconnected(broadcast_receiver, target_address): 355 """Waits for event that indicates current Wi-Fi p2p peer is disconnected.""" 356 357 def _is_peer_disconnect_event(event): 358 devices = constants.WifiP2pDevice.from_dict_list( 359 event.data[constants.EVENT_KEY_PEER_LIST] 360 ) 361 for device in devices: 362 if device.device_address == target_address: 363 return device.status != constants.WifiP2pDeviceStatus.CONNECTED 364 # Target device not found also means it is disconnected. 365 return True 366 367 broadcast_receiver.waitForEvent( 368 event_name=constants.WIFI_P2P_PEERS_CHANGED_ACTION, 369 predicate=_is_peer_disconnect_event, 370 timeout=_DEFAULT_TIMEOUT.total_seconds(), 371 ) 372 373 374def _clear_events(device: DeviceState, event_name): 375 """Clears the events with the given name in the broadcast receiver.""" 376 all_events = device.broadcast_receiver.getAll(event_name) 377 device.ad.log.debug( 378 'Cleared %d events of event name %s', len(all_events), event_name 379 ) 380 381 382def _teardown_wifi_p2p(ad: android_device.AndroidDevice): 383 """Destroys all resources initialized in `_setup_wifi_p2p`.""" 384 ad.wifi.wifiP2pStopPeerDiscovery() 385 ad.wifi.wifiP2pCancelConnect() 386 ad.wifi.wifiP2pRemoveGroup() 387 ad.wifi.p2pClose() 388 389 390class GroupOwnerNegotiationTest(base_test.BaseTestClass): 391 """Group owner negotiation tests.""" 392 393 ads: Sequence[android_device.AndroidDevice] 394 requester_ad: android_device.AndroidDevice 395 responder_ad: android_device.AndroidDevice 396 397 def setup_class(self) -> None: 398 super().setup_class() 399 self.ads = self.register_controller(android_device, min_number=2) 400 self.responder_ad, self.requester_ad, *_ = self.ads 401 self.responder_ad.debug_tag = f'{self.responder_ad.serial}(Responder)' 402 self.requester_ad.debug_tag = f'{self.requester_ad.serial}(Requester)' 403 utils.concurrent_exec( 404 self._setup_device, 405 param_list=[[ad] for ad in self.ads], 406 raise_on_exception=True, 407 ) 408 409 def _setup_device(self, ad: android_device.AndroidDevice) -> DeviceState: 410 ad.load_snippet('wifi', constants.WIFI_SNIPPET_PACKAGE_NAME) 411 wifi_test_utils.set_screen_on_and_unlock(ad) 412 # Clear all saved Wi-Fi networks. 413 ad.wifi.wifiDisable() 414 ad.wifi.wifiClearConfiguredNetworks() 415 ad.wifi.wifiEnable() 416 417 def test_group_owner_negotiation_with_push_button(self) -> None: 418 """Test against group owner negotiation and WPS PBC (push button). 419 420 Steps: 421 1. Initialize Wi-Fi p2p on both responder and requester device. 422 2. Initiate p2p discovery. Requester should be able to find 423 the responder. 424 3. Establish a p2p connection with WPS PBC (push button 425 configuration). Requester initiates a connection request. 426 Responder clicks accept button to accept the connection. 427 4. Stop the connection. 428 """ 429 responder = _setup_wifi_p2p(self.responder_ad) 430 requester = _setup_wifi_p2p(self.requester_ad) 431 432 requester_peer_p2p_device = _find_p2p_device(requester, responder) 433 434 # Make sure that peer is not a group owner (GO) as this is testing 435 # against GO negotiation. 436 asserts.assert_false( 437 requester_peer_p2p_device.is_group_owner, 438 f'{requester} found target responder device with invalid role.' 439 ' It should not be group owner.', 440 ) 441 442 _p2p_connect_with_push_button(requester, responder) 443 444 _remove_group_and_verify_disconnected(requester, responder) 445 446 def _teardown_device(self, ad: android_device.AndroidDevice): 447 _teardown_wifi_p2p(ad) 448 ad.services.create_output_excerpts_all(self.current_test_info) 449 450 def teardown_test(self) -> None: 451 utils.concurrent_exec( 452 self._teardown_device, 453 param_list=[[ad] for ad in self.ads], 454 raise_on_exception=True, 455 ) 456 457 def on_fail(self, record: records.TestResult) -> None: 458 logging.info('Collecting bugreports...') 459 android_device.take_bug_reports( 460 self.ads, destination=self.current_test_info.output_path 461 ) 462 463 464if __name__ == '__main__': 465 test_runner.main() 466