1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15# Lint as: python3
16
17from collections.abc import Sequence
18import dataclasses
19import datetime
20import logging
21
22from mobly import asserts
23from mobly import base_test
24from mobly import records
25from mobly import test_runner
26from mobly import utils
27from mobly.controllers import android_device
28from mobly.controllers.android_device_lib import callback_handler_v2
29from mobly.snippet import errors
30import wifi_test_utils
31
32from direct import constants
33
34_DEFAULT_TIMEOUT = datetime.timedelta(seconds=30)
35
36
37@dataclasses.dataclass(frozen=True)
38class DeviceState:
39    """All objects related to operating p2p snippet RPCs.
40
41    Attributes:
42        ad: The Android device controller object.
43        p2p_device: The object that represents a Wi-Fi p2p device.
44    """
45
46    ad: android_device.AndroidDevice
47    p2p_device: constants.WifiP2pDevice
48    broadcast_receiver: callback_handler_v2.CallbackHandlerV2
49
50
51def _setup_wifi_p2p(ad: android_device.AndroidDevice) -> DeviceState:
52    """Sets up Wi-Fi p2p for automation tests on an Android device."""
53    broadcast_receiver = _init_wifi_p2p(ad)
54    _delete_all_persistent_groups(ad)
55    p2p_device = _get_p2p_device(ad)
56    asserts.assert_not_equal(
57        p2p_device.device_address,
58        constants.ANONYMIZED_MAC_ADDRESS,
59        'Failed to get p2p device MAC address, please check permissions '
60        'required by API WifiP2pManager#requestConnectionInfo',
61    )
62    return DeviceState(
63        ad=ad, p2p_device=p2p_device, broadcast_receiver=broadcast_receiver
64    )
65
66
67def _init_wifi_p2p(
68    ad: android_device.AndroidDevice,
69) -> callback_handler_v2.CallbackHandlerV2:
70    """Registers the snippet app with the Wi-Fi p2p framework.
71
72    This must be the first to be called before any p2p operations are performed.
73
74    Args:
75        ad: The Android device controller object.
76
77    Returns:
78        The broadcast receiver from which you can get snippet events
79        corresponding to Wi-Fi p2p intents received on device.
80    """
81    broadcast_receiver = ad.wifi.wifiP2pInitialize()
82    init_event = broadcast_receiver.waitAndGet(
83        event_name=constants.WIFI_P2P_STATE_CHANGED_ACTION,
84        timeout=_DEFAULT_TIMEOUT.total_seconds(),
85    )
86    state = constants.ExtraWifiState(
87        init_event.data[constants.EXTRA_WIFI_STATE]
88    )
89    asserts.assert_equal(
90        state,
91        constants.ExtraWifiState.WIFI_P2P_STATE_ENABLED,
92        f'Failed to initialize Wi-Fi P2P, state: {state}',
93    )
94    return broadcast_receiver
95
96
97def _capture_p2p_intents(
98    ad: android_device.AndroidDevice,
99) -> callback_handler_v2.CallbackHandlerV2:
100    """Starts capturing Wi-Fi p2p intents and returns the intent receiver."""
101    broadcast_receiver = ad.wifi.wifiP2pCaptureP2pIntents()
102    return broadcast_receiver
103
104
105def _delete_all_persistent_groups(
106    ad: android_device.AndroidDevice,
107) -> None:
108    """Deletes all persistent Wi-Fi p2p groups."""
109    groups = _request_persistent_group_info(ad)
110    ad.log.debug('Wi-Fi p2p persistent groups before delete: %s', groups)
111    for group in groups:
112        result_data = ad.wifi.wifiP2pDeletePersistentGroup(group.network_id)
113        result = result_data[constants.EVENT_KEY_CALLBACK_NAME]
114        if result != constants.ACTION_LISTENER_ON_SUCCESS:
115            reason = constants.ActionListenerOnFailure(
116                result_data[constants.EVENT_KEY_REASON]
117            )
118            raise RuntimeError(
119                'Failed to delete persistent group with network id '
120                f'{group.network_id}. Reason: {reason.name}'
121            )
122    groups = _request_persistent_group_info(ad)
123    ad.log.debug('Wi-Fi p2p persistent groups after delete: %s', groups)
124
125
126def _request_persistent_group_info(
127    ad: android_device.AndroidDevice,
128) -> Sequence[constants.WifiP2pGroup]:
129    """Requests persistent group information."""
130    callback_handler = ad.wifi.wifiP2pRequestPersistentGroupInfo()
131    event = callback_handler.waitAndGet(
132        event_name=constants.ON_PERSISTENT_GROUP_INFO_AVAILABLE,
133        timeout=_DEFAULT_TIMEOUT.total_seconds(),
134    )
135    groups = constants.WifiP2pGroup.from_dict_list(event.data['groupList'])
136    return groups
137
138
139def _get_p2p_device(
140    ad: android_device.AndroidDevice,
141) -> constants.WifiP2pDevice:
142    """Gets the Wi-Fi p2p device information."""
143    callback_handler = ad.wifi.wifiP2pRequestDeviceInfo()
144    event = callback_handler.waitAndGet(
145        event_name=constants.ON_DEVICE_INFO_AVAILABLE,
146        timeout=_DEFAULT_TIMEOUT.total_seconds(),
147    )
148    return constants.WifiP2pDevice.from_dict(
149        event.data[constants.EVENT_KEY_P2P_DEVICE]
150    )
151
152
153def _find_p2p_device(
154    requester: DeviceState,
155    responder: DeviceState,
156) -> constants.WifiP2pDevice:
157    """Initiates Wi-Fi p2p discovery for the requester to find the responder.
158
159    This initiates Wi-Fi p2p discovery on both devices and checks that the
160    requester can discover responder and return peer p2p device.
161    """
162    requester.ad.log.debug('Discovering Wi-Fi P2P peers.')
163    responder.ad.wifi.wifiP2pDiscoverPeers()
164
165    _clear_events(requester, constants.WIFI_P2P_PEERS_CHANGED_ACTION)
166    requester.ad.wifi.wifiP2pDiscoverPeers()
167
168    event = requester.broadcast_receiver.waitAndGet(
169        event_name=constants.WIFI_P2P_PEERS_CHANGED_ACTION,
170        timeout=_DEFAULT_TIMEOUT.total_seconds(),
171    )
172    requester_peers = constants.WifiP2pDevice.from_dict_list(
173        event.data[constants.EVENT_KEY_PEER_LIST]
174    )
175
176    responder_mac = responder.p2p_device.device_address
177    filtered_peers = [
178        peer for peer in requester_peers if peer.device_address == responder_mac
179    ]
180    if len(filtered_peers) == 0:
181        asserts.fail(
182            f'{requester.ad} did not find the responder device. Responder MAC '
183            f'address: {responder_mac}, found peers: {requester_peers}.'
184        )
185    if len(filtered_peers) > 1:
186        asserts.fail(
187            f'{requester.ad} found more than one responder device. Responder '
188            f'MAC address: {responder_mac}, found peers: {requester_peers}.'
189        )
190    return filtered_peers[0]
191
192
193def _p2p_connect_with_push_button(
194    requester: DeviceState,
195    responder: DeviceState,
196) -> constants.WifiP2pDevice:
197    """Establishes Wi-Fi p2p connection with WPS push button configuration.
198
199    This initiates p2p connection on requester, accepts invitation on responder,
200    and checks connection status on both devices.
201
202    Args:
203        requester: The requester device.
204        responder: The respodner device.
205
206    Returns:
207        The peer p2p device found on the requester.
208    """
209    logging.info('Establishing a p2p connection through WPS PBC.')
210
211    # Clear events in broadcast receiver.
212    _clear_events(requester, constants.WIFI_P2P_PEERS_CHANGED_ACTION)
213    _clear_events(requester, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION)
214    _clear_events(responder, constants.WIFI_P2P_PEERS_CHANGED_ACTION)
215    _clear_events(responder, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION)
216
217    # Send P2P connect invitation from requester.
218    config = constants.WifiP2pConfig(
219        device_address=responder.p2p_device.device_address,
220        wps_setup=constants.WpsInfo.PBC,
221    )
222    requester.ad.wifi.wifiP2pConnect(config.to_dict())
223    requester.ad.log.info(
224        'Successfully sent P2P connect invitation to responder.'
225    )
226
227    # Click accept button on responder.
228    responder.ad.wifi.wifiP2pAcceptInvitation(requester.p2p_device.device_name)
229    responder.ad.log.info('Accepted connect invitation.')
230
231    # Check p2p status on requester.
232    _wait_connection_notice(requester.broadcast_receiver)
233    _wait_peer_connected(
234        requester.broadcast_receiver,
235        responder.p2p_device.device_address,
236    )
237    requester.ad.log.debug(
238        'Connected with device %s through wifi p2p.',
239        responder.p2p_device.device_address,
240    )
241
242    # Check p2p status on responder.
243    _wait_connection_notice(responder.broadcast_receiver)
244    _wait_peer_connected(
245        responder.broadcast_receiver,
246        requester.p2p_device.device_address,
247    )
248    responder.ad.log.debug(
249        'Connected with device %s through wifi p2p.',
250        requester.p2p_device.device_address,
251    )
252
253    logging.info('Established wifi p2p connection.')
254
255
256def _wait_peer_connected(
257    broadcast_receiver: callback_handler_v2.CallbackHandlerV2, peer_address: str
258):
259    """Waits for event that indicates expected Wi-Fi p2p peer is connected."""
260
261    def _is_peer_connected(event):
262        devices = constants.WifiP2pDevice.from_dict_list(event.data['peerList'])
263        for device in devices:
264            if (
265                device.device_address == peer_address
266                and device.status == constants.WifiP2pDeviceStatus.CONNECTED
267            ):
268                return True
269        return False
270
271    broadcast_receiver.waitForEvent(
272        event_name=constants.WIFI_P2P_PEERS_CHANGED_ACTION,
273        predicate=_is_peer_connected,
274        timeout=_DEFAULT_TIMEOUT.total_seconds(),
275    )
276
277
278def _wait_connection_notice(
279    broadcast_receiver: callback_handler_v2.CallbackHandlerV2,
280):
281    """Waits for event that indicates a p2p connection is established."""
282
283    def _is_group_formed(event):
284        try:
285            p2p_info = constants.WifiP2pInfo.from_dict(
286                event.data[constants.EVENT_KEY_P2P_INFO]
287            )
288            return p2p_info.group_formed
289        except KeyError:
290            return False
291
292    event = broadcast_receiver.waitForEvent(
293        event_name=constants.WIFI_P2P_CONNECTION_CHANGED_ACTION,
294        predicate=_is_group_formed,
295        timeout=_DEFAULT_TIMEOUT.total_seconds(),
296    )
297
298
299def _remove_group_and_verify_disconnected(
300    requester: DeviceState,
301    responder: DeviceState,
302):
303    """Stops p2p connection and verifies disconnection status on devices."""
304    logging.info('Stopping wifi p2p connection.')
305
306    # Clear events in broadcast receiver.
307    _clear_events(requester, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION)
308    _clear_events(requester, constants.ON_DEVICE_INFO_AVAILABLE)
309    _clear_events(responder, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION)
310    _clear_events(responder, constants.ON_DEVICE_INFO_AVAILABLE)
311
312    # Requester initiates p2p group removal.
313    requester.ad.wifi.wifiP2pRemoveGroup()
314
315    # Check p2p status on requester.
316    _wait_disconnection_notice(requester.broadcast_receiver)
317    _wait_peer_disconnected(
318        requester.broadcast_receiver, responder.p2p_device.device_address
319    )
320    requester.ad.log.debug(
321        'Disconnected with device %s through wifi p2p.',
322        responder.p2p_device.device_address,
323    )
324
325    # Check p2p status on responder.
326    _wait_disconnection_notice(responder.broadcast_receiver)
327    _wait_peer_disconnected(
328        responder.broadcast_receiver, requester.p2p_device.device_address
329    )
330    responder.ad.log.debug(
331        'Disconnected with device %s through wifi p2p.',
332        requester.p2p_device.device_address,
333    )
334
335    logging.info('Stopped wifi p2p connection.')
336
337
338def _wait_disconnection_notice(broadcast_receiver):
339    """Waits for event that indicates the p2p connection is disconnected."""
340
341    def _is_disconnect_event(event):
342        info = constants.WifiP2pInfo.from_dict(
343            event.data[constants.EVENT_KEY_P2P_INFO]
344        )
345        return not info.group_formed
346
347    broadcast_receiver.waitForEvent(
348        event_name=constants.WIFI_P2P_CONNECTION_CHANGED_ACTION,
349        predicate=_is_disconnect_event,
350        timeout=_DEFAULT_TIMEOUT.total_seconds(),
351    )
352
353
354def _wait_peer_disconnected(broadcast_receiver, target_address):
355    """Waits for event that indicates current Wi-Fi p2p peer is disconnected."""
356
357    def _is_peer_disconnect_event(event):
358        devices = constants.WifiP2pDevice.from_dict_list(
359            event.data[constants.EVENT_KEY_PEER_LIST]
360        )
361        for device in devices:
362            if device.device_address == target_address:
363                return device.status != constants.WifiP2pDeviceStatus.CONNECTED
364        # Target device not found also means it is disconnected.
365        return True
366
367    broadcast_receiver.waitForEvent(
368        event_name=constants.WIFI_P2P_PEERS_CHANGED_ACTION,
369        predicate=_is_peer_disconnect_event,
370        timeout=_DEFAULT_TIMEOUT.total_seconds(),
371    )
372
373
374def _clear_events(device: DeviceState, event_name):
375    """Clears the events with the given name in the broadcast receiver."""
376    all_events = device.broadcast_receiver.getAll(event_name)
377    device.ad.log.debug(
378        'Cleared %d events of event name %s', len(all_events), event_name
379    )
380
381
382def _teardown_wifi_p2p(ad: android_device.AndroidDevice):
383    """Destroys all resources initialized in `_setup_wifi_p2p`."""
384    ad.wifi.wifiP2pStopPeerDiscovery()
385    ad.wifi.wifiP2pCancelConnect()
386    ad.wifi.wifiP2pRemoveGroup()
387    ad.wifi.p2pClose()
388
389
390class GroupOwnerNegotiationTest(base_test.BaseTestClass):
391    """Group owner negotiation tests."""
392
393    ads: Sequence[android_device.AndroidDevice]
394    requester_ad: android_device.AndroidDevice
395    responder_ad: android_device.AndroidDevice
396
397    def setup_class(self) -> None:
398        super().setup_class()
399        self.ads = self.register_controller(android_device, min_number=2)
400        self.responder_ad, self.requester_ad, *_ = self.ads
401        self.responder_ad.debug_tag = f'{self.responder_ad.serial}(Responder)'
402        self.requester_ad.debug_tag = f'{self.requester_ad.serial}(Requester)'
403        utils.concurrent_exec(
404            self._setup_device,
405            param_list=[[ad] for ad in self.ads],
406            raise_on_exception=True,
407        )
408
409    def _setup_device(self, ad: android_device.AndroidDevice) -> DeviceState:
410        ad.load_snippet('wifi', constants.WIFI_SNIPPET_PACKAGE_NAME)
411        wifi_test_utils.set_screen_on_and_unlock(ad)
412        # Clear all saved Wi-Fi networks.
413        ad.wifi.wifiDisable()
414        ad.wifi.wifiClearConfiguredNetworks()
415        ad.wifi.wifiEnable()
416
417    def test_group_owner_negotiation_with_push_button(self) -> None:
418        """Test against group owner negotiation and WPS PBC (push button).
419
420        Steps:
421            1. Initialize Wi-Fi p2p on both responder and requester device.
422            2. Initiate p2p discovery. Requester should be able to find
423               the responder.
424            3. Establish a p2p connection with WPS PBC (push button
425               configuration). Requester initiates a connection request.
426               Responder clicks accept button to accept the connection.
427            4. Stop the connection.
428        """
429        responder = _setup_wifi_p2p(self.responder_ad)
430        requester = _setup_wifi_p2p(self.requester_ad)
431
432        requester_peer_p2p_device = _find_p2p_device(requester, responder)
433
434        # Make sure that peer is not a group owner (GO) as this is testing
435        # against GO negotiation.
436        asserts.assert_false(
437            requester_peer_p2p_device.is_group_owner,
438            f'{requester} found target responder device with invalid role.'
439            ' It should not be group owner.',
440        )
441
442        _p2p_connect_with_push_button(requester, responder)
443
444        _remove_group_and_verify_disconnected(requester, responder)
445
446    def _teardown_device(self, ad: android_device.AndroidDevice):
447        _teardown_wifi_p2p(ad)
448        ad.services.create_output_excerpts_all(self.current_test_info)
449
450    def teardown_test(self) -> None:
451        utils.concurrent_exec(
452            self._teardown_device,
453            param_list=[[ad] for ad in self.ads],
454            raise_on_exception=True,
455        )
456
457    def on_fail(self, record: records.TestResult) -> None:
458        logging.info('Collecting bugreports...')
459        android_device.take_bug_reports(
460            self.ads, destination=self.current_test_info.output_path
461        )
462
463
464if __name__ == '__main__':
465    test_runner.main()
466