1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19from typing import List, Optional, Type 20from typing_extensions import Self 21 22from bumble.controller import Controller 23from bumble.link import LocalLink 24from bumble.device import Device, Connection 25from bumble.host import Host 26from bumble.transport import AsyncPipeSink 27from bumble.hci import Address 28 29 30# ----------------------------------------------------------------------------- 31class TwoDevices: 32 connections: List[Optional[Connection]] 33 34 def __init__(self) -> None: 35 self.connections = [None, None] 36 37 self.link = LocalLink() 38 addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0'] 39 self.controllers = [ 40 Controller('C1', link=self.link, public_address=addresses[0]), 41 Controller('C2', link=self.link, public_address=addresses[1]), 42 ] 43 self.devices = [ 44 Device( 45 address=Address(addresses[0]), 46 host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])), 47 ), 48 Device( 49 address=Address(addresses[1]), 50 host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])), 51 ), 52 ] 53 54 self.paired = [None, None] 55 56 def on_connection(self, which, connection): 57 self.connections[which] = connection 58 59 def on_paired(self, which, keys): 60 self.paired[which] = keys 61 62 async def setup_connection(self) -> None: 63 # Attach listeners 64 self.devices[0].on( 65 'connection', lambda connection: self.on_connection(0, connection) 66 ) 67 self.devices[1].on( 68 'connection', lambda connection: self.on_connection(1, connection) 69 ) 70 71 # Start 72 await self.devices[0].power_on() 73 await self.devices[1].power_on() 74 75 # Connect the two devices 76 await self.devices[0].connect(self.devices[1].random_address) 77 78 # Check the post conditions 79 assert self.connections[0] is not None 80 assert self.connections[1] is not None 81 82 def __getitem__(self, index: int) -> Device: 83 return self.devices[index] 84 85 @classmethod 86 async def create_with_connection(cls: Type[Self]) -> Self: 87 devices = cls() 88 await devices.setup_connection() 89 return devices 90 91 92# ----------------------------------------------------------------------------- 93async def async_barrier(): 94 ready = asyncio.get_running_loop().create_future() 95 asyncio.get_running_loop().call_soon(ready.set_result, None) 96 await ready 97