1# Lint as: python2, python3
2# Copyright 2020 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5""" Server-side bluetooth adapter tests that involve suspend/resume with peers
6
7paired and/or connected.
8
9Single btpeer tests:
10  - Reconnect on resume test
11    - Classic HID
12    - LE HID
13    - A2DP
14  - Wake from suspend test
15    - Classic HID
16    - LE HID
17    - A2DP shouldn't wake from suspend
18  - Suspend while discovering (discovering should pause and unpause)
19  - Suspend while advertising (advertising should pause and unpause)
20
21Multiple btpeer tests:
22  - Reconnect on resume test
23    - One classic HID, One LE HID
24    - Two classic HID
25    - Two LE HID
26  - Wake from suspend test
27    - Two classic HID
28    - Two classic LE
29"""
30from __future__ import absolute_import
31from __future__ import division
32from __future__ import print_function
33
34import logging
35import time
36
37from autotest_lib.client.cros.bluetooth.bluetooth_audio_test_data import A2DP
38from autotest_lib.server.cros.bluetooth.bluetooth_adapter_tests import (
39        TABLET_MODELS, SUSPEND_POWER_DOWN_CHIPSETS,
40        SUSPEND_RESET_IF_NO_PEER_CHIPSETS, SUSPEND_POWER_DOWN_MODELS)
41from autotest_lib.server.cros.bluetooth.bluetooth_adapter_audio_tests import (
42        BluetoothAdapterAudioTests)
43from autotest_lib.server.cros.bluetooth.bluetooth_adapter_quick_tests import (
44        BluetoothAdapterQuickTests)
45from autotest_lib.server.cros.bluetooth.bluetooth_adapter_quick_tests import (
46        PROFILE_CONNECT_WAIT, SUSPEND_SEC, EXPECT_NO_WAKE_SUSPEND_SEC)
47from six.moves import range
48
49test_wrapper = BluetoothAdapterQuickTests.quick_test_test_decorator
50batch_wrapper = BluetoothAdapterQuickTests.quick_test_batch_decorator
51
52STRESS_ITERATIONS = 50
53
54
55class bluetooth_AdapterSRHealth(BluetoothAdapterQuickTests,
56                                BluetoothAdapterAudioTests):
57    """Server side bluetooth adapter suspend resume test with peer."""
58
59    def _test_keyboard_with_string(self, device):
60        return (self.test_hid_device_created(device.address)
61                and self.test_keyboard_input_from_trace(device, "simple_text"))
62
63    def _test_mouse(self, device):
64        return (self.test_hid_device_created(device.address)
65                and self.test_mouse_left_click(device)
66                and self.test_mouse_move_in_xy(device, -60, 100)
67                and self.test_mouse_scroll_down(device, 70)
68                and self.test_mouse_click_and_drag(device, 90, 30))
69
70    # ---------------------------------------------------------------
71    # Reconnect after suspend tests
72    # ---------------------------------------------------------------
73
74    def run_reconnect_device(self,
75                             devtuples,
76                             iterations=1,
77                             auto_reconnect=False):
78        """ Reconnects a device after suspend/resume.
79
80        @param devtuples: array of tuples consisting of the following
81                            * device_type: MOUSE, BLE_MOUSE, etc.
82                            * device: meta object for peer device
83                            * device_test: Optional; test function to run w/
84                                           device (for example, mouse click)
85        @params iterations: number of suspend/resume + reconnect iterations
86        @params auto_reconnect: Expect host to automatically reconnect to peer
87        """
88        boot_id = self.host.get_boot_id()
89
90        try:
91            # Set up the device; any failures should assert
92            for _, device, device_test in devtuples:
93                self.assert_discover_and_pair(device)
94                self.assert_on_fail(
95                        self.test_device_set_discoverable(device, False))
96                self.assert_on_fail(
97                        self.test_connection_by_adapter(device.address))
98
99                # Profile connection may not have completed yet and this will
100                # race with a subsequent disconnection (due to suspend). Use the
101                # device test to force profile connect or wait if no test was
102                # given.
103                if device_test is not None:
104                    self.assert_on_fail(device_test(device))
105                else:
106                    time.sleep(PROFILE_CONNECT_WAIT)
107
108            for it in range(iterations):
109                logging.info('Running iteration {}/{} of suspend reconnection'.
110                             format(it + 1, iterations))
111
112                # Start the suspend process
113                suspend = self.suspend_async(suspend_time=SUSPEND_SEC)
114                start_time = self.bluetooth_facade.get_device_utc_time()
115
116                # Trigger suspend, wait for regular resume, verify we can reconnect
117                # and run device specific test
118                self.test_suspend_and_wait_for_sleep(suspend,
119                                                     sleep_timeout=SUSPEND_SEC)
120                self.test_wait_for_resume(boot_id,
121                                          suspend,
122                                          resume_timeout=SUSPEND_SEC,
123                                          test_start_time=start_time)
124
125                # Only reconnect if we don't expect automatic reconnect.
126                # Let the devices initiate connections before the DUT initiates
127                # auto reconnections.
128                # Complete reconnecting all peers before running device tests.
129                # Otherwise, we may have a race between auto reconnection
130                # from the dut and peer initiated connection. See b/177870286
131                if not auto_reconnect:
132                    for device_type, device, _ in devtuples:
133                        if 'BLE' in device_type:
134                            # LE can't reconnect without
135                            # advertising/discoverable
136                            self.test_device_set_discoverable(device, True)
137                            # Make sure we're actually connected
138                            self.test_device_is_connected(device.address)
139                        else:
140                            # Classic requires peer to initiate a connection to
141                            # wake up the dut
142                            self.test_connection_by_device(device)
143
144                for _, device, device_test in devtuples:
145                    if device_test is not None:
146                        device_test(device)
147
148        finally:
149            for _, device, __ in devtuples:
150                self.test_remove_pairing(device.address)
151
152    @test_wrapper('Reconnect Classic HID', devices={'MOUSE': 1})
153    def sr_reconnect_classic_hid(self):
154        """ Reconnects a classic HID device after suspend/resume. """
155        device_type = 'MOUSE'
156        device = self.devices[device_type][0]
157        self.run_reconnect_device([(device_type, device,
158                                    self._test_mouse)])
159
160    @test_wrapper('Reconnect LE HID', devices={'BLE_MOUSE': 1})
161    def sr_reconnect_le_hid(self):
162        """ Reconnects a LE HID device after suspend/resume. """
163        device_type = 'BLE_MOUSE'
164        device = self.devices[device_type][0]
165        self.run_reconnect_device([(device_type, device,
166                                    self._test_mouse)])
167
168    # TODO(b/163143005) - Hana can't handle two concurrent HID connections
169    @test_wrapper('Reconnect Multiple Classic HID',
170                  devices={
171                          'MOUSE': 1,
172                          'KEYBOARD': 1
173                  },
174                  skip_models=['hana'])
175    def sr_reconnect_multiple_classic_hid(self):
176        """ Reconnects multiple classic HID devices after suspend/resume. """
177        devices = [('MOUSE', self.devices['MOUSE'][0],
178                    self._test_mouse),
179                   ('KEYBOARD', self.devices['KEYBOARD'][0],
180                    self._test_keyboard_with_string)]
181        self.run_reconnect_device(devices)
182
183    @test_wrapper('Reconnect Multiple LE HID',
184                  devices={
185                          'BLE_MOUSE': 1,
186                          'BLE_KEYBOARD': 1
187                  })
188    def sr_reconnect_multiple_le_hid(self):
189        """ Reconnects multiple LE HID devices after suspend/resume. """
190        devices = [('BLE_MOUSE', self.devices['BLE_MOUSE'][0],
191                    self._test_mouse),
192                   ('BLE_KEYBOARD', self.devices['BLE_KEYBOARD'][0],
193                    self._test_keyboard_with_string)]
194        self.run_reconnect_device(devices)
195
196    @test_wrapper('Reconnect one of each classic+LE HID',
197                  devices={
198                          'BLE_MOUSE': 1,
199                          'KEYBOARD': 1
200                  })
201    def sr_reconnect_multiple_classic_le_hid(self):
202        """ Reconnects one of each classic and LE HID devices after
203            suspend/resume.
204        """
205        devices = [('BLE_MOUSE', self.devices['BLE_MOUSE'][0],
206                    self._test_mouse),
207                   ('KEYBOARD', self.devices['KEYBOARD'][0],
208                    self._test_keyboard_with_string)]
209        self.run_reconnect_device(devices)
210
211    @test_wrapper('Reconnect Classic HID Stress Test', devices={'MOUSE': 1})
212    def sr_reconnect_classic_hid_stress(self):
213        """ Reconnects a classic HID device after suspend/resume. """
214        device_type = 'MOUSE'
215        device = self.devices[device_type][0]
216        self.run_reconnect_device(
217                [(device_type, device, self._test_mouse)],
218                iterations=STRESS_ITERATIONS)
219
220    @test_wrapper('Reconnect LE HID Stress Test', devices={'BLE_MOUSE': 1})
221    def sr_reconnect_le_hid_stress(self):
222        """ Reconnects a LE HID device after suspend/resume. """
223        device_type = 'BLE_MOUSE'
224        device = self.devices[device_type][0]
225        self.run_reconnect_device(
226                [(device_type, device, self._test_mouse)],
227                iterations=STRESS_ITERATIONS)
228
229    @test_wrapper('Reconnect A2DP',
230                  devices={'BLUETOOTH_AUDIO': 1},
231                  skip_chipsets=SUSPEND_POWER_DOWN_CHIPSETS)
232    def sr_reconnect_a2dp(self):
233        """ Reconnects an A2DP device after suspend/resume. """
234        device_type = 'BLUETOOTH_AUDIO'
235        device = self.devices[device_type][0]
236        self.initialize_bluetooth_audio(device, A2DP)
237        self.run_reconnect_device(
238                [(device_type, device, self.test_device_a2dp_connected)],
239                auto_reconnect=True)
240
241
242    # TODO(b/151332866) - Bob can't wake from suspend due to wrong power/wakeup
243    # TODO(b/150897528) - Dru is powered down during suspend, won't wake up
244    @test_wrapper('Peer wakeup Classic HID',
245                  devices={'MOUSE': 1},
246                  skip_models=TABLET_MODELS + SUSPEND_POWER_DOWN_MODELS +
247                  ['bob'],
248                  skip_chipsets=SUSPEND_POWER_DOWN_CHIPSETS)
249    def sr_peer_wake_classic_hid(self):
250        """ Use classic HID device to wake from suspend. """
251        device = self.devices['MOUSE'][0]
252        self.run_peer_wakeup_device('MOUSE',
253                                    device,
254                                    device_test=self._test_mouse)
255
256    # TODO(b/151332866) - Bob can't wake from suspend due to wrong power/wakeup
257    # TODO(b/150897528) - Dru is powered down during suspend, won't wake up
258    @test_wrapper('Peer wakeup LE HID',
259                  devices={'BLE_MOUSE': 1},
260                  skip_models=TABLET_MODELS + SUSPEND_POWER_DOWN_MODELS +
261                  ['bob'],
262                  skip_chipsets=SUSPEND_POWER_DOWN_CHIPSETS)
263    def sr_peer_wake_le_hid(self):
264        """ Use LE HID device to wake from suspend. """
265        device = self.devices['BLE_MOUSE'][0]
266        self.run_peer_wakeup_device('BLE_MOUSE',
267                                    device,
268                                    device_test=self._test_mouse)
269
270
271    # TODO(b/151332866) - Bob can't wake from suspend due to wrong power/wakeup
272    # TODO(b/150897528) - Dru is powered down during suspend, won't wake up
273    @test_wrapper('Peer wakeup LE HID with reconnect LE HID',
274                  devices={
275                          'BLE_MOUSE': 1,
276                          'BLE_KEYBOARD': 1
277                  },
278                  skip_models=TABLET_MODELS + SUSPEND_POWER_DOWN_MODELS +
279                  ['bob'],
280                  skip_chipsets=SUSPEND_POWER_DOWN_CHIPSETS)
281    def sr_peer_wake_le_hid_reconnect_le_hid(self):
282        """ Use LE HID device to wake from suspend. And reconnects a secondary
283            LE HID device afterwards
284        """
285        device = self.devices['BLE_MOUSE'][0]
286        device_reconnect = self.devices['BLE_KEYBOARD'][0]
287
288        self.assert_discover_and_pair(device_reconnect)
289        self.test_device_set_discoverable(device_reconnect, False)
290        self.test_connection_by_adapter(device_reconnect.address)
291        self._test_keyboard_with_string(device_reconnect)
292
293        self.run_peer_wakeup_device('BLE_MOUSE',
294                                    device,
295                                    device_test=self._test_mouse,
296                                    keep_paired=True)
297
298        self.test_device_set_discoverable(device_reconnect, True)
299        self.test_device_is_connected(device_reconnect.address)
300        self._test_keyboard_with_string(device_reconnect)
301
302
303    # TODO(b/151332866) - Bob can't wake from suspend due to wrong power/wakeup
304    # TODO(b/150897528) - Dru is powered down during suspend, won't wake up
305    @test_wrapper('Peer wakeup Classic HID',
306                  devices={'MOUSE': 1},
307                  skip_models=TABLET_MODELS + SUSPEND_POWER_DOWN_MODELS +
308                  ['bob'],
309                  skip_chipsets=SUSPEND_POWER_DOWN_CHIPSETS)
310    def sr_peer_wake_classic_hid_stress(self):
311        """ Use classic HID device to wake from suspend. """
312        device = self.devices['MOUSE'][0]
313        self.run_peer_wakeup_device('MOUSE',
314                                    device,
315                                    device_test=self._test_mouse,
316                                    iterations=STRESS_ITERATIONS)
317
318    # TODO(b/151332866) - Bob can't wake from suspend due to wrong power/wakeup
319    # TODO(b/150897528) - Dru is powered down during suspend, won't wake up
320    @test_wrapper('Peer wakeup LE HID',
321                  devices={'BLE_MOUSE': 1},
322                  skip_models=TABLET_MODELS + SUSPEND_POWER_DOWN_MODELS +
323                  ['bob'],
324                  skip_chipsets=SUSPEND_POWER_DOWN_CHIPSETS)
325    def sr_peer_wake_le_hid_stress(self):
326        """ Use LE HID device to wake from suspend. """
327        device = self.devices['BLE_MOUSE'][0]
328        self.run_peer_wakeup_device('BLE_MOUSE',
329                                    device,
330                                    device_test=self._test_mouse,
331                                    iterations=STRESS_ITERATIONS)
332
333    @test_wrapper('Peer wakeup with A2DP should fail',
334                  devices={'BLUETOOTH_AUDIO': 1},
335                  skip_chipsets=SUSPEND_POWER_DOWN_CHIPSETS)
336    def sr_peer_wake_a2dp_should_fail(self):
337        """ Use A2DP device to wake from suspend and fail. """
338        device_type = 'BLUETOOTH_AUDIO'
339        device = self.devices[device_type][0]
340        self.initialize_bluetooth_audio(device, A2DP)
341        self.run_peer_wakeup_device(
342                device_type,
343                device,
344                device_test=self.test_device_a2dp_connected,
345                should_wake=False)
346
347    # ---------------------------------------------------------------
348    # Suspend while discovering and advertising
349    # ---------------------------------------------------------------
350
351    # TODO(b/150897528) - Scarlet Dru loses firmware around suspend
352    @test_wrapper('Suspend while discovering',
353                  devices={'BLE_MOUSE': 1},
354                  skip_models=SUSPEND_POWER_DOWN_MODELS,
355                  skip_chipsets=SUSPEND_POWER_DOWN_CHIPSETS +
356                  SUSPEND_RESET_IF_NO_PEER_CHIPSETS)
357    def sr_while_discovering(self):
358        """ Suspend while discovering. """
359        device = self.devices['BLE_MOUSE'][0]
360        boot_id = self.host.get_boot_id()
361
362        self.test_device_set_discoverable(device, True)
363
364        # Test discovery without setting discovery filter
365        # ----------------------------------------------------------------------
366        suspend = self.suspend_async(suspend_time=EXPECT_NO_WAKE_SUSPEND_SEC)
367        start_time = self.bluetooth_facade.get_device_utc_time()
368
369        # We don't pair to the peer device because we don't want it in the
370        # allowlist. However, we want an advertising peer in this test
371        # responding to the discovery requests.
372        self.test_start_discovery()
373        self.test_suspend_and_wait_for_sleep(suspend,
374                                             sleep_timeout=SUSPEND_SEC)
375
376        # If discovery events wake us early, we will raise and suspend.exitcode
377        # will be non-zero
378        self.test_wait_for_resume(boot_id,
379                                  suspend,
380                                  resume_timeout=EXPECT_NO_WAKE_SUSPEND_SEC,
381                                  test_start_time=start_time)
382
383        # Discovering should restore after suspend
384        self.test_is_discovering()
385        self.test_stop_discovery()
386
387        # Test discovery with discovery filter set
388        # ----------------------------------------------------------------------
389        suspend = self.suspend_async(suspend_time=EXPECT_NO_WAKE_SUSPEND_SEC)
390        start_time = self.bluetooth_facade.get_device_utc_time()
391
392        self.test_set_discovery_filter({'Transport': 'auto'})
393        self.test_start_discovery()
394        self.test_suspend_and_wait_for_sleep(suspend,
395                                             sleep_timeout=SUSPEND_SEC)
396
397        # If discovery events wake us early, we will raise and suspend.exitcode
398        # will be non-zero
399        self.test_wait_for_resume(boot_id,
400                                  suspend,
401                                  resume_timeout=EXPECT_NO_WAKE_SUSPEND_SEC,
402                                  test_start_time=start_time)
403
404        # Discovering should restore after suspend
405        self.test_is_discovering()
406        self.test_stop_discovery()
407
408    # TODO(b/150897528) - Scarlet Dru loses firmware around suspend
409    @test_wrapper('Suspend while advertising',
410                  devices={'MOUSE': 1},
411                  skip_models=SUSPEND_POWER_DOWN_MODELS,
412                  skip_chipsets=SUSPEND_POWER_DOWN_CHIPSETS +
413                  SUSPEND_RESET_IF_NO_PEER_CHIPSETS)
414    def sr_while_advertising(self):
415        """ Suspend while advertising. """
416        device = self.devices['MOUSE'][0]
417        boot_id = self.host.get_boot_id()
418        suspend = self.suspend_async(suspend_time=EXPECT_NO_WAKE_SUSPEND_SEC)
419        start_time = self.bluetooth_facade.get_device_utc_time()
420
421        self.test_discoverable()
422        self.test_suspend_and_wait_for_sleep(suspend,
423                                             sleep_timeout=SUSPEND_SEC)
424
425        # Peer device should not be able to discover us in suspend
426        self.test_discover_by_device_fails(device)
427
428        self.test_wait_for_resume(boot_id,
429                                  suspend,
430                                  resume_timeout=EXPECT_NO_WAKE_SUSPEND_SEC,
431                                  test_start_time=start_time)
432
433        # Test that we are properly discoverable again
434        self.test_is_discoverable()
435        self.test_discover_by_device(device)
436
437        self.test_nondiscoverable()
438
439    # ---------------------------------------------------------------
440    # Health checks
441    # ---------------------------------------------------------------
442
443    @test_wrapper('Suspend while powered off', devices={'MOUSE': 1})
444    def sr_while_powered_off(self):
445        """ Suspend while adapter is powered off. """
446        device = self.devices['MOUSE'][0]
447        boot_id = self.host.get_boot_id()
448        suspend = self.suspend_async(suspend_time=SUSPEND_SEC)
449        start_time = self.bluetooth_facade.get_device_utc_time()
450
451        # Pair device so we have something to do in suspend
452        self.assert_discover_and_pair(device)
453
454        # Trigger power down and quickly suspend
455        self.test_power_off_adapter()
456        self.test_suspend_and_wait_for_sleep(suspend,
457                                             sleep_timeout=SUSPEND_SEC)
458        # Suspend and resume should succeed
459        self.test_wait_for_resume(boot_id,
460                                  suspend,
461                                  resume_timeout=SUSPEND_SEC,
462                                  test_start_time=start_time)
463
464        # We should be able to power it back on
465        self.test_power_on_adapter()
466
467        # Test that we can reconnect to the device after powering back on
468        self.test_connection_by_device(device)
469
470    @batch_wrapper('SR with Peer Health')
471    def sr_health_batch_run(self, num_iterations=1, test_name=None):
472        """ Batch of suspend/resume peer health tests. """
473        self.sr_reconnect_classic_hid()
474        self.sr_reconnect_le_hid()
475        self.sr_peer_wake_classic_hid()
476        self.sr_peer_wake_le_hid()
477        self.sr_while_discovering()
478        self.sr_while_advertising()
479        self.sr_reconnect_multiple_classic_hid()
480        self.sr_reconnect_multiple_le_hid()
481        self.sr_reconnect_multiple_classic_le_hid()
482
483    def run_once(self,
484                 host,
485                 num_iterations=1,
486                 args_dict=None,
487                 test_name=None,
488                 flag='Quick Health'):
489        """Running Bluetooth adapter suspend resume with peer autotest.
490
491        @param host: the DUT, usually a chromebook
492        @param num_iterations: the number of times to execute the test
493        @param test_name: the test to run or None for all tests
494        @param flag: run tests with this flag (default: Quick Health)
495
496        """
497
498        # Initialize and run the test batch or the requested specific test
499        self.quick_test_init(host,
500                             use_btpeer=True,
501                             flag=flag,
502                             args_dict=args_dict)
503        self.sr_health_batch_run(num_iterations, test_name)
504        self.quick_test_cleanup()
505