1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19from typing import List, Optional, Type
20from typing_extensions import Self
21
22from bumble.controller import Controller
23from bumble.link import LocalLink
24from bumble.device import Device, Connection
25from bumble.host import Host
26from bumble.transport import AsyncPipeSink
27from bumble.hci import Address
28
29
30# -----------------------------------------------------------------------------
31class TwoDevices:
32    connections: List[Optional[Connection]]
33
34    def __init__(self) -> None:
35        self.connections = [None, None]
36
37        self.link = LocalLink()
38        addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
39        self.controllers = [
40            Controller('C1', link=self.link, public_address=addresses[0]),
41            Controller('C2', link=self.link, public_address=addresses[1]),
42        ]
43        self.devices = [
44            Device(
45                address=Address(addresses[0]),
46                host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
47            ),
48            Device(
49                address=Address(addresses[1]),
50                host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
51            ),
52        ]
53
54        self.paired = [None, None]
55
56    def on_connection(self, which, connection):
57        self.connections[which] = connection
58
59    def on_paired(self, which, keys):
60        self.paired[which] = keys
61
62    async def setup_connection(self) -> None:
63        # Attach listeners
64        self.devices[0].on(
65            'connection', lambda connection: self.on_connection(0, connection)
66        )
67        self.devices[1].on(
68            'connection', lambda connection: self.on_connection(1, connection)
69        )
70
71        # Start
72        await self.devices[0].power_on()
73        await self.devices[1].power_on()
74
75        # Connect the two devices
76        await self.devices[0].connect(self.devices[1].random_address)
77
78        # Check the post conditions
79        assert self.connections[0] is not None
80        assert self.connections[1] is not None
81
82    def __getitem__(self, index: int) -> Device:
83        return self.devices[index]
84
85    @classmethod
86    async def create_with_connection(cls: Type[Self]) -> Self:
87        devices = cls()
88        await devices.setup_connection()
89        return devices
90
91
92# -----------------------------------------------------------------------------
93async def async_barrier():
94    ready = asyncio.get_running_loop().create_future()
95    asyncio.get_running_loop().call_soon(ready.set_result, None)
96    await ready
97