1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16
17from mobly.asserts import assert_equal, fail
18
19from pairing.test_base import PairTestBase
20
21from avatar import asynchronous
22
23from bumble.att import (
24    ATT_Error,
25    ATT_INSUFFICIENT_AUTHENTICATION_ERROR,
26    ATT_INSUFFICIENT_ENCRYPTION_ERROR,
27)
28
29from bumble.gatt import (
30    Service,
31    Characteristic,
32    CharacteristicValue,
33)
34
35from bumble.pairing import PairingConfig
36
37from pandora.host_pb2 import (
38    RANDOM,
39    DataTypes,
40)
41
42from pandora.security_pb2 import (
43    LE_LEVEL3,
44    PairingEventAnswer,
45)
46
47from pandora_experimental.gatt_grpc_aio import GATT as AioGATT
48
49from pandora_experimental.gatt_pb2 import DiscoverServicesRequest
50
51
52AUTHENTICATION_ERROR_RETURNED = [False, False]
53
54def _gatt_read_with_error(connection):
55    if not connection.is_encrypted:
56        raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR)
57
58    if AUTHENTICATION_ERROR_RETURNED[0]:
59        return bytes([1])
60
61    AUTHENTICATION_ERROR_RETURNED[0] = True
62    raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR)
63
64
65def _gatt_write_with_error(connection, _value):
66    if not connection.is_encrypted:
67        raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR)
68
69    if not AUTHENTICATION_ERROR_RETURNED[1]:
70        AUTHENTICATION_ERROR_RETURNED[1] = True
71        raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR)
72
73class BLEPairTestBase(PairTestBase):
74
75    async def start_acl_connection(self):
76        adv_seed = b'pause cafe'
77        # responder starts advertising
78        resp_advertisement = self.acl_responder.aio.host.Advertise(
79            legacy=True,
80            connectable=True,
81            own_address_type=PairingConfig.AddressType.RANDOM,
82            data=DataTypes(manufacturer_specific_data=adv_seed),
83        )
84
85        # initiator starts scanning
86        init_scanning = self.acl_initiator.aio.host.Scan(own_address_type=PairingConfig.AddressType.RANDOM)
87        init_scan_res = await anext(
88            (x async for x in init_scanning if adv_seed in x.data.manufacturer_specific_data)
89        )
90        init_scanning.cancel()
91
92        init_res, resp_res = await asyncio.gather(
93            self.acl_initiator.aio.host.ConnectLE(
94                own_address_type=PairingConfig.AddressType.RANDOM, **init_scan_res.address_asdict()
95            ),
96            anext(aiter(resp_advertisement)),
97        )
98
99        resp_advertisement.cancel()
100
101        assert_equal(init_res.result_variant(), 'connection')
102
103        return init_res, resp_res
104
105    async def start_pairing(
106        self,
107        initiator_acl_connection,
108        responder_acl_connection,
109    ):
110        init_res, resp_res = await asyncio.gather(
111            self.pairing_initiator.aio.security.Secure(connection=initiator_acl_connection, le=LE_LEVEL3),
112            self.pairing_responder.aio.security.WaitSecurity(connection=responder_acl_connection, le=LE_LEVEL3),
113        )
114
115        # verify that pairing succeeded
116        assert_equal(init_res.result_variant(), 'success')
117        assert_equal(resp_res.result_variant(), 'success')
118
119        return init_res, resp_res
120
121    async def start_service_access(
122        self,
123        initiator_acl_connection,
124        responder_acl_connection,
125    ):
126        # acl connection initiated from bumble
127        assert_equal(self.dut, self.acl_initiator)
128        serv_uuid = '50DB505C-8AC4-4738-8448-3B1D9CC09CC5'
129        char_uuid = '552957FB-CF1F-4A31-9535-E78847E1A714'
130
131        # to trigger pairing,
132        # add a GATT service with some characteristic on bumble
133        # access to which will return security error
134        self.ref.device.add_service(
135            Service(
136                serv_uuid,
137                [
138                    Characteristic(
139                        char_uuid,
140                        Characteristic.Properties.READ
141                        | Characteristic.Properties.WRITE,
142                        Characteristic.READABLE | Characteristic.WRITEABLE,
143                        CharacteristicValue(
144                            read=_gatt_read_with_error, write=_gatt_write_with_error
145                        ),
146                    )
147                ],
148           )
149        )
150
151        dut_gatt = AioGATT(self.dut.aio.channel)
152
153        services = await dut_gatt.DiscoverServices(connection=initiator_acl_connection)
154        for service in services.services:
155            for char in service.characteristics:
156                if char.uuid == char_uuid:
157                    await dut_gatt.ReadCharacteristicFromHandle(handle=char.handle,
158                                                                connection=initiator_acl_connection)
159
160        return True
161
162
163class BLEPairTestBaseWithGeneralPairingTests(BLEPairTestBase):
164
165    @asynchronous
166    async def test_general_pairing(self) -> None:
167        # role setup
168        self.acl_initiator = self.dut
169        self.acl_responder = self.ref
170        self.pairing_initiator = self.dut
171        self.pairing_responder = self.ref
172        self.service_initiator = self.dut
173        self.service_responder = self.ref
174
175        self.prepare_pairing()
176
177        android_res, bumble_res= await self.start_acl_connection()
178
179        service_access_task = asyncio.create_task(self.start_service_access(android_res.connection, bumble_res.connection))
180
181        await self.accept_pairing()
182
183        try:
184            _ = await asyncio.wait_for(service_access_task, timeout=5.0)
185        except:
186            fail("connection should have succeeded")
187
188
189class BLEPairTestBaseWithGeneralAndDedicatedPairingTests(BLEPairTestBaseWithGeneralPairingTests):
190
191    @asynchronous
192    async def test_dedicated_pairing_ref_initiate_1(self) -> None:
193        '''
194        acl:
195            ref: initiator
196            dut: responder
197
198        pairing:
199            ref: initiator
200            dut: responder
201        '''
202
203        self.acl_initiator = self.ref
204        self.acl_responder = self.dut
205        self.pairing_initiator = self.ref
206        self.pairing_responder = self.dut
207
208        self.prepare_pairing()
209
210        bumble_res, android_res = await self.start_acl_connection()
211
212        pairing_task = asyncio.create_task(self.start_pairing(bumble_res.connection, android_res.connection))
213        await self.accept_pairing()
214        await asyncio.wait_for(pairing_task, timeout=10.0)
215
216    @asynchronous
217    async def test_dedicated_pairing_ref_initiate_2(self) -> None:
218        '''
219        acl:
220            ref: initiator
221            dut: responder
222
223        pairing:
224            ref: responder
225            dut: initiator
226        '''
227
228        self.acl_initiator = self.ref
229        self.acl_responder = self.dut
230        self.pairing_initiator = self.dut
231        self.pairing_responder = self.ref
232
233        self.prepare_pairing()
234
235        bumble_res, android_res = await self.start_acl_connection()
236
237        pairing_task = asyncio.create_task(self.start_pairing(android_res.connection, bumble_res.connection))
238        await self.accept_pairing()
239        await asyncio.wait_for(pairing_task, timeout=10.0)
240
241    @asynchronous
242    async def test_dedicated_pairing_dut_initiate_1(self) -> None:
243        '''
244        acl:
245            ref: responder
246            dut: initiator
247
248        pairing:
249            ref: responder
250            dut: initiator
251        '''
252
253        self.acl_initiator = self.dut
254        self.acl_responder = self.ref
255        self.pairing_initiator = self.dut
256        self.pairing_responder = self.ref
257
258        self.prepare_pairing()
259
260        android_res, bumble_res = await self.start_acl_connection()
261
262        pairing_task = asyncio.create_task(self.start_pairing(android_res.connection, bumble_res.connection))
263        await self.accept_pairing()
264        await asyncio.wait_for(pairing_task, timeout=10.0)
265
266    @asynchronous
267    async def test_dedicated_pairing_dut_initiate_2(self) -> None:
268        '''
269        acl:
270            ref: responder
271            dut: initiator
272
273        pairing:
274            ref: responder
275            dut: initiator
276        '''
277
278        self.acl_initiator = self.dut
279        self.acl_responder = self.ref
280        self.pairing_initiator = self.ref
281        self.pairing_responder = self.dut
282
283        self.prepare_pairing()
284
285        android_res, bumble_res = await self.start_acl_connection()
286
287        pairing_task = asyncio.create_task(self.start_pairing(bumble_res.connection, android_res.connection))
288        await self.accept_pairing()
289        await asyncio.wait_for(pairing_task, timeout=10.0)
290