xref: /aosp_15_r20/external/autotest/server/cros/bluetooth/bluetooth_adapter_better_together.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright 2020 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""A class provides most of the test logic for Bluetooth Better Together"""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11
12import logging
13import base64
14import json
15
16import common
17from autotest_lib.server.cros.bluetooth.bluetooth_adapter_quick_tests import (
18        BluetoothAdapterQuickTests)
19from autotest_lib.server.cros.bluetooth.bluetooth_adapter_pairing_tests import (
20        BluetoothAdapterPairingTests)
21from autotest_lib.server.cros.bluetooth.bluetooth_adapter_tests import (
22        test_retry_and_log)
23from six.moves import range
24
25class BluetoothAdapterBetterTogether(BluetoothAdapterQuickTests,
26                                        BluetoothAdapterPairingTests):
27  """A Batch of Bluetooth LE tests for Better Together. This test is written
28     as a batch of tests in order to reduce test time, since auto-test ramp up
29     time is costly. The batch is using BluetoothAdapterQuickTests wrapper
30     methods to start and end a test and a batch of tests.
31
32     This class can be called to run the entire test batch or to run a
33     specific test only
34  """
35
36  BETTER_TOGETHER_SERVICE_UUID = 'b3b7e28e-a000-3e17-bd86-6e97b9e28c11'
37  CLIENT_RX_CHARACTERISTIC_UUID = '00000100-0004-1000-8000-001a11000102'
38  CLIENT_TX_CHARACTERISTIC_UUID = '00000100-0004-1000-8000-001a11000101'
39  CCCD_VALUE_INDICATION = 0x02
40  TEST_ITERATION = 3
41
42  # The following messages were captured during a smart unlock process. These
43  # are the messages exchanged between the phone and the chromebook to
44  # authorize each other in order to unlock the chromebook.
45  CONNECTION_OPEN_MESSAGE = b'\x80\x00\x01\x00\x01\x00\x00'
46  MESSAGE_1 = b'\x1c\x03\x00\xc7\x7b\x22\x66\x65\x61\x74\x75\x72' \
47              b'\x65\x22\x3a\x22\x61\x75\x74\x68\x22\x2c\x22\x70' \
48              b'\x61\x79\x6c\x6f\x61\x64\x22\x3a\x22\x43\x6c\x6b' \
49              b'\x4b\x56\x41\x67\x42\x45\x41\x45\x79\x54\x67\x70' \
50              b'\x4b\x43\x41\x45\x53\x52\x67\x6f\x68\x41\x49\x37' \
51              b'\x6e\x78\x63\x34\x6b\x44\x4d\x68\x6d\x68\x6e\x4d' \
52              b'\x75\x69\x7a\x79\x62\x47\x54\x5f\x31\x4e\x6c\x52' \
53              b'\x4c\x6c\x6d\x32\x59\x34\x72\x35\x78\x6f\x6e\x69' \
54              b'\x47\x51\x35\x49\x6a\x45\x69\x45\x41\x75\x45\x64' \
55              b'\x43\x4c\x37\x36\x44\x70\x30\x70\x6a\x6a\x75\x74' \
56              b'\x56\x6a\x47\x35\x38\x62\x55\x49\x57\x42\x57\x49' \
57              b'\x41\x4f\x5a\x39\x38\x46\x43\x49\x4f\x79\x58\x68' \
58              b'\x2d\x6b\x66\x6f\x51\x41\x52\x49\x42\x72\x68\x49' \
59              b'\x67\x7a\x35\x62\x49\x37\x53\x6f\x59\x47\x56\x36' \
60              b'\x6b\x4e\x4e\x74\x6d\x73\x59\x53\x6a\x75\x68\x4d' \
61              b'\x68\x61\x66\x51\x6c\x4d\x44\x55\x68\x78\x42\x36' \
62              b'\x44\x50\x52\x79\x4c\x5a\x62\x30\x3d\x22\x7d'
63
64  MESSAGE_2 = b'\x2c\x03\x00\xfb\x7b\x22\x66\x65\x61\x74\x75\x72' \
65              b'\x65\x22\x3a\x22\x61\x75\x74\x68\x22\x2c\x22\x70' \
66              b'\x61\x79\x6c\x6f\x61\x64\x22\x3a\x22\x43\x6f\x41' \
67              b'\x42\x43\x68\x77\x49\x41\x52\x41\x43\x4b\x68\x43' \
68              b'\x34\x69\x38\x45\x47\x73\x76\x71\x6e\x78\x5f\x66' \
69              b'\x66\x4c\x33\x59\x42\x61\x6b\x35\x59\x4d\x67\x51' \
70              b'\x49\x44\x52\x41\x42\x45\x6d\x42\x41\x71\x5f\x47' \
71              b'\x62\x72\x49\x42\x6c\x42\x42\x76\x73\x6a\x73\x32' \
72              b'\x67\x47\x56\x59\x70\x76\x73\x50\x6f\x64\x57\x6e' \
73              b'\x70\x50\x42\x6e\x5a\x71\x41\x61\x46\x6b\x30\x59' \
74              b'\x54\x48\x76\x79\x6b\x55\x76\x6b\x4a\x79\x6c\x35' \
75              b'\x56\x54\x7a\x48\x53\x35\x43\x6e\x41\x65\x56\x4d' \
76              b'\x79\x38\x49\x76\x44\x5f\x67\x65\x6d\x4e\x65\x42' \
77              b'\x61\x76\x58\x70\x70\x52\x62\x4b\x43\x42\x34\x5f' \
78              b'\x35\x35\x79\x4a\x50\x4e\x6b\x5f\x76\x45\x66\x53' \
79              b'\x38\x57\x5a\x7a\x6d\x58\x64\x5a\x4d\x68\x72\x74' \
80              b'\x31\x61\x6d\x67\x46\x7a\x6e\x35\x4b\x65\x61\x2d' \
81              b'\x77\x5f\x58\x55\x53\x49\x47\x54\x6e\x55\x33\x6d' \
82              b'\x68\x45\x36\x67\x63\x5a\x4c\x4b\x49\x52\x79\x4d' \
83              b'\x61\x39\x68\x78\x41\x4f\x30\x4b\x6f\x7a\x6d\x68' \
84              b'\x43\x71\x6d\x4d\x42\x54\x4a\x6e\x57\x4e\x6f\x52' \
85              b'\x62\x22\x7d'
86
87  MESSAGE_3 = b'\x3c\x03\x00\xae\x7b\x22\x66\x65\x61\x74\x75\x72' \
88              b'\x65\x22\x3a\x22\x65\x61\x73\x79\x5f\x75\x6e\x6c' \
89              b'\x6f\x63\x6b\x22\x2c\x22\x70\x61\x79\x6c\x6f\x61' \
90              b'\x64\x22\x3a\x22\x43\x6b\x41\x4b\x48\x41\x67\x42' \
91              b'\x45\x41\x49\x71\x45\x44\x30\x72\x52\x69\x54\x77' \
92              b'\x32\x41\x6e\x6e\x5f\x5f\x5f\x59\x33\x6c\x68\x55' \
93              b'\x53\x65\x45\x79\x42\x41\x67\x4e\x45\x41\x45\x53' \
94              b'\x49\x46\x59\x61\x64\x36\x33\x43\x75\x52\x34\x67' \
95              b'\x37\x77\x4d\x41\x4b\x61\x65\x76\x2d\x72\x7a\x38' \
96              b'\x66\x64\x6f\x47\x46\x2d\x67\x44\x4c\x6a\x37\x50' \
97              b'\x47\x62\x43\x6f\x57\x54\x66\x6f\x45\x69\x43\x38' \
98              b'\x4a\x56\x62\x4c\x48\x75\x49\x35\x42\x76\x38\x43' \
99              b'\x4c\x74\x73\x53\x51\x35\x50\x4c\x6e\x72\x5a\x41' \
100              b'\x70\x68\x6b\x75\x4c\x78\x2d\x56\x59\x64\x6c\x32' \
101              b'\x53\x4d\x71\x5f\x36\x41\x3d\x3d\x22\x7d'
102
103  MESSAGE_4 = b'\x4c\x03\x00\xc2\x7b\x22\x66\x65\x61\x74\x75\x72' \
104              b'\x65\x22\x3a\x22\x65\x61\x73\x79\x5f\x75\x6e\x6c' \
105              b'\x6f\x63\x6b\x22\x2c\x22\x70\x61\x79\x6c\x6f\x61' \
106              b'\x64\x22\x3a\x22\x43\x6c\x41\x4b\x48\x41\x67\x42' \
107              b'\x45\x41\x49\x71\x45\x4d\x61\x4e\x6c\x75\x43\x56' \
108              b'\x63\x72\x59\x37\x68\x70\x34\x68\x46\x74\x69\x6f' \
109              b'\x45\x4d\x6b\x79\x42\x41\x67\x4e\x45\x41\x45\x53' \
110              b'\x4d\x47\x6c\x68\x33\x55\x68\x44\x77\x58\x4b\x70' \
111              b'\x58\x46\x5f\x5a\x6e\x4f\x61\x30\x36\x4a\x48\x4b' \
112              b'\x69\x6b\x56\x68\x51\x62\x32\x50\x4d\x36\x62\x62' \
113              b'\x76\x78\x51\x6a\x47\x50\x47\x73\x66\x79\x42\x5a' \
114              b'\x74\x52\x67\x39\x5f\x65\x36\x6f\x42\x5a\x79\x5f' \
115              b'\x74\x35\x42\x45\x74\x78\x49\x67\x51\x75\x37\x42' \
116              b'\x62\x6e\x75\x4e\x57\x64\x67\x6d\x42\x36\x4d\x47' \
117              b'\x75\x68\x54\x79\x34\x33\x43\x37\x32\x2d\x58\x44' \
118              b'\x58\x35\x4b\x4f\x6a\x67\x6d\x56\x74\x48\x58\x41' \
119              b'\x68\x4e\x67\x3d\x22\x7d'
120
121  CONNECTION_CLOSE_MESSAGE = b'\xd2\x00\x00'
122
123  def test_smart_unlock(self, address):
124    """Simulate the Smart Unlock flow, here are the steps that involve
125       1. Set the discovery filter to match LE devices only.
126       2. Start the discovery.
127       3. Stop the discovery after the device was found.
128       4. Set the LE connection parameters to reduce the min and max
129          connection intervals to 6.
130       5. Connect the device.
131       6. Set the Trusted property of the device to true.
132       7. Verify all the services were resolved.
133       8. Start notification on the RX characteristic of the
134           Proximity Service.
135       9. Exchange some messages with the peer device to authorize it.
136       10. Stop the notification.
137       11. Disconnect the device.
138    """
139
140    filter = {'Transport':'le'}
141    parameters = {'MinimumConnectionInterval':6,
142                  'MaximumConnectionInterval':6}
143
144    # We don't use the control file for iteration since it will involve the
145    # device setup steps which don't reflect the real user scenario.
146    for i in range(self.TEST_ITERATION):
147      logging.debug("Test iteration %d", i)
148      self.test_set_discovery_filter(filter)
149      self.test_discover_device(address)
150
151      self.test_set_le_connection_parameters(address, parameters)
152      self.test_connection_by_adapter(address)
153
154      self.test_set_trusted(address)
155      self.test_service_resolved(address)
156      self.test_find_object_path(address)
157
158      self.test_start_notify(self.rx_object_path,
159                             self.CCCD_VALUE_INDICATION)
160      self.test_messages_exchange(
161          self.rx_object_path, self.tx_object_path, address)
162      self.test_stop_notify(self.rx_object_path)
163      self.test_disconnection_by_adapter(address)
164
165      self.test_remove_device_object(address)
166
167    return True
168
169
170  def test_smart_unlock_llt(self, address):
171    """Smart unlock flow for llt cases """
172    filter = {'Transport': 'le'}
173    parameters = {'MinimumConnectionInterval': 6,
174                  'MaximumConnectionInterval': 6}
175
176    self.test_set_discovery_filter(filter)
177    self.test_discover_device(address)
178
179    self.test_set_le_connection_parameters(address, parameters)
180    self.test_connection_by_adapter(address)
181
182    self.test_set_trusted(address)
183    self.test_service_resolved(address)
184    self.test_find_object_path(address)
185
186    self.test_start_notify(self.rx_object_path,
187                           self.CCCD_VALUE_INDICATION)
188    self.test_messages_exchange(
189        self.rx_object_path, self.tx_object_path, address)
190    self.test_stop_notify(self.rx_object_path)
191
192
193  @test_retry_and_log(False)
194  def test_remove_device_object(self, address):
195    """Test the device object can be removed from the adapter"""
196    return self.bluetooth_facade.remove_device_object(address)
197
198
199  @test_retry_and_log(False)
200  def test_find_object_path(self, address):
201    """Test the object path can be found for a device"""
202    self.tx_object_path = None
203    self.rx_object_path = None
204    attr_map = self.bluetooth_facade.get_gatt_attributes_map(address)
205    attr_map_json = json.loads(attr_map)
206    servs_json = attr_map_json['services']
207    for uuid_s in servs_json:
208      if uuid_s == self.BETTER_TOGETHER_SERVICE_UUID:
209        chrcs_json = servs_json[uuid_s]['characteristics']
210        for uuid_c in chrcs_json:
211          if uuid_c == self.CLIENT_TX_CHARACTERISTIC_UUID:
212            self.tx_object_path = chrcs_json[uuid_c]['path']
213          if uuid_c == self.CLIENT_RX_CHARACTERISTIC_UUID:
214            self.rx_object_path = chrcs_json[uuid_c]['path']
215
216    return self.tx_object_path and self.rx_object_path
217
218
219  @test_retry_and_log(False)
220  def test_messages_exchange(
221      self, rx_object_path, tx_object_path, address):
222    """Test message exchange with the peer, Better Together performs the
223       messages exchange for authorizing the peer device before unlocking
224       the Chromebook.
225    """
226    if rx_object_path is None or tx_object_path is None:
227        logging.error('Invalid object path')
228        return False
229
230    self.test_message_exchange(
231        rx_object_path,
232        tx_object_path,
233        self.CONNECTION_OPEN_MESSAGE,
234        'connection open message')
235
236    self.test_message_exchange(
237        rx_object_path,
238        tx_object_path,
239        self.MESSAGE_1,
240        'message 1')
241
242    self.test_message_exchange(
243        rx_object_path,
244        tx_object_path,
245        self.MESSAGE_2,
246        'message 2')
247
248    # Better Together gets connection info multiple times to ensure
249    # the device is close enough by checking the RSSI, here we simulate it.
250    for i in range(9):
251      self.test_get_connection_info(address)
252
253    self.test_message_exchange(
254        rx_object_path,
255        tx_object_path,
256        self.MESSAGE_3,
257        'message 3')
258
259    self.test_message_exchange(
260        rx_object_path,
261        tx_object_path,
262        self.MESSAGE_4,
263        'message 4')
264
265    for i in range(2):
266      self.test_get_connection_info(address)
267
268    self.test_message_exchange(
269        rx_object_path,
270        tx_object_path,
271        self.CONNECTION_CLOSE_MESSAGE,
272        'connection close message')
273
274    return True
275
276
277  @test_retry_and_log(False)
278  def test_message_exchange(
279      self, rx_object_path, tx_object_path, message, message_type):
280    """Test message exchange between DUT and the peer device"""
281    ret_msg = self.bluetooth_facade.exchange_messages(
282        tx_object_path, rx_object_path, message)
283
284    if not ret_msg:
285      logging.error("Failed to write message: %s", message_type)
286      return False
287
288    return base64.standard_b64decode(ret_msg) == message
289