1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16 17from mobly.asserts import assert_equal, fail 18 19from pairing.test_base import PairTestBase 20 21from avatar import asynchronous 22 23from bumble.att import ( 24 ATT_Error, 25 ATT_INSUFFICIENT_AUTHENTICATION_ERROR, 26 ATT_INSUFFICIENT_ENCRYPTION_ERROR, 27) 28 29from bumble.gatt import ( 30 Service, 31 Characteristic, 32 CharacteristicValue, 33) 34 35from bumble.pairing import PairingConfig 36 37from pandora.host_pb2 import ( 38 RANDOM, 39 DataTypes, 40) 41 42from pandora.security_pb2 import ( 43 LE_LEVEL3, 44 PairingEventAnswer, 45) 46 47from pandora_experimental.gatt_grpc_aio import GATT as AioGATT 48 49from pandora_experimental.gatt_pb2 import DiscoverServicesRequest 50 51 52AUTHENTICATION_ERROR_RETURNED = [False, False] 53 54def _gatt_read_with_error(connection): 55 if not connection.is_encrypted: 56 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 57 58 if AUTHENTICATION_ERROR_RETURNED[0]: 59 return bytes([1]) 60 61 AUTHENTICATION_ERROR_RETURNED[0] = True 62 raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR) 63 64 65def _gatt_write_with_error(connection, _value): 66 if not connection.is_encrypted: 67 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 68 69 if not AUTHENTICATION_ERROR_RETURNED[1]: 70 AUTHENTICATION_ERROR_RETURNED[1] = True 71 raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR) 72 73class BLEPairTestBase(PairTestBase): 74 75 async def start_acl_connection(self): 76 adv_seed = b'pause cafe' 77 # responder starts advertising 78 resp_advertisement = self.acl_responder.aio.host.Advertise( 79 legacy=True, 80 connectable=True, 81 own_address_type=PairingConfig.AddressType.RANDOM, 82 data=DataTypes(manufacturer_specific_data=adv_seed), 83 ) 84 85 # initiator starts scanning 86 init_scanning = self.acl_initiator.aio.host.Scan(own_address_type=PairingConfig.AddressType.RANDOM) 87 init_scan_res = await anext( 88 (x async for x in init_scanning if adv_seed in x.data.manufacturer_specific_data) 89 ) 90 init_scanning.cancel() 91 92 init_res, resp_res = await asyncio.gather( 93 self.acl_initiator.aio.host.ConnectLE( 94 own_address_type=PairingConfig.AddressType.RANDOM, **init_scan_res.address_asdict() 95 ), 96 anext(aiter(resp_advertisement)), 97 ) 98 99 resp_advertisement.cancel() 100 101 assert_equal(init_res.result_variant(), 'connection') 102 103 return init_res, resp_res 104 105 async def start_pairing( 106 self, 107 initiator_acl_connection, 108 responder_acl_connection, 109 ): 110 init_res, resp_res = await asyncio.gather( 111 self.pairing_initiator.aio.security.Secure(connection=initiator_acl_connection, le=LE_LEVEL3), 112 self.pairing_responder.aio.security.WaitSecurity(connection=responder_acl_connection, le=LE_LEVEL3), 113 ) 114 115 # verify that pairing succeeded 116 assert_equal(init_res.result_variant(), 'success') 117 assert_equal(resp_res.result_variant(), 'success') 118 119 return init_res, resp_res 120 121 async def start_service_access( 122 self, 123 initiator_acl_connection, 124 responder_acl_connection, 125 ): 126 # acl connection initiated from bumble 127 assert_equal(self.dut, self.acl_initiator) 128 serv_uuid = '50DB505C-8AC4-4738-8448-3B1D9CC09CC5' 129 char_uuid = '552957FB-CF1F-4A31-9535-E78847E1A714' 130 131 # to trigger pairing, 132 # add a GATT service with some characteristic on bumble 133 # access to which will return security error 134 self.ref.device.add_service( 135 Service( 136 serv_uuid, 137 [ 138 Characteristic( 139 char_uuid, 140 Characteristic.Properties.READ 141 | Characteristic.Properties.WRITE, 142 Characteristic.READABLE | Characteristic.WRITEABLE, 143 CharacteristicValue( 144 read=_gatt_read_with_error, write=_gatt_write_with_error 145 ), 146 ) 147 ], 148 ) 149 ) 150 151 dut_gatt = AioGATT(self.dut.aio.channel) 152 153 services = await dut_gatt.DiscoverServices(connection=initiator_acl_connection) 154 for service in services.services: 155 for char in service.characteristics: 156 if char.uuid == char_uuid: 157 await dut_gatt.ReadCharacteristicFromHandle(handle=char.handle, 158 connection=initiator_acl_connection) 159 160 return True 161 162 163class BLEPairTestBaseWithGeneralPairingTests(BLEPairTestBase): 164 165 @asynchronous 166 async def test_general_pairing(self) -> None: 167 # role setup 168 self.acl_initiator = self.dut 169 self.acl_responder = self.ref 170 self.pairing_initiator = self.dut 171 self.pairing_responder = self.ref 172 self.service_initiator = self.dut 173 self.service_responder = self.ref 174 175 self.prepare_pairing() 176 177 android_res, bumble_res= await self.start_acl_connection() 178 179 service_access_task = asyncio.create_task(self.start_service_access(android_res.connection, bumble_res.connection)) 180 181 await self.accept_pairing() 182 183 try: 184 _ = await asyncio.wait_for(service_access_task, timeout=5.0) 185 except: 186 fail("connection should have succeeded") 187 188 189class BLEPairTestBaseWithGeneralAndDedicatedPairingTests(BLEPairTestBaseWithGeneralPairingTests): 190 191 @asynchronous 192 async def test_dedicated_pairing_ref_initiate_1(self) -> None: 193 ''' 194 acl: 195 ref: initiator 196 dut: responder 197 198 pairing: 199 ref: initiator 200 dut: responder 201 ''' 202 203 self.acl_initiator = self.ref 204 self.acl_responder = self.dut 205 self.pairing_initiator = self.ref 206 self.pairing_responder = self.dut 207 208 self.prepare_pairing() 209 210 bumble_res, android_res = await self.start_acl_connection() 211 212 pairing_task = asyncio.create_task(self.start_pairing(bumble_res.connection, android_res.connection)) 213 await self.accept_pairing() 214 await asyncio.wait_for(pairing_task, timeout=10.0) 215 216 @asynchronous 217 async def test_dedicated_pairing_ref_initiate_2(self) -> None: 218 ''' 219 acl: 220 ref: initiator 221 dut: responder 222 223 pairing: 224 ref: responder 225 dut: initiator 226 ''' 227 228 self.acl_initiator = self.ref 229 self.acl_responder = self.dut 230 self.pairing_initiator = self.dut 231 self.pairing_responder = self.ref 232 233 self.prepare_pairing() 234 235 bumble_res, android_res = await self.start_acl_connection() 236 237 pairing_task = asyncio.create_task(self.start_pairing(android_res.connection, bumble_res.connection)) 238 await self.accept_pairing() 239 await asyncio.wait_for(pairing_task, timeout=10.0) 240 241 @asynchronous 242 async def test_dedicated_pairing_dut_initiate_1(self) -> None: 243 ''' 244 acl: 245 ref: responder 246 dut: initiator 247 248 pairing: 249 ref: responder 250 dut: initiator 251 ''' 252 253 self.acl_initiator = self.dut 254 self.acl_responder = self.ref 255 self.pairing_initiator = self.dut 256 self.pairing_responder = self.ref 257 258 self.prepare_pairing() 259 260 android_res, bumble_res = await self.start_acl_connection() 261 262 pairing_task = asyncio.create_task(self.start_pairing(android_res.connection, bumble_res.connection)) 263 await self.accept_pairing() 264 await asyncio.wait_for(pairing_task, timeout=10.0) 265 266 @asynchronous 267 async def test_dedicated_pairing_dut_initiate_2(self) -> None: 268 ''' 269 acl: 270 ref: responder 271 dut: initiator 272 273 pairing: 274 ref: responder 275 dut: initiator 276 ''' 277 278 self.acl_initiator = self.dut 279 self.acl_responder = self.ref 280 self.pairing_initiator = self.ref 281 self.pairing_responder = self.dut 282 283 self.prepare_pairing() 284 285 android_res, bumble_res = await self.start_acl_connection() 286 287 pairing_task = asyncio.create_task(self.start_pairing(bumble_res.connection, android_res.connection)) 288 await self.accept_pairing() 289 await asyncio.wait_for(pairing_task, timeout=10.0) 290