1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import logging 19import asyncio 20import sys 21import os 22from bumble.colors import color 23from bumble.device import Device 24from bumble.controller import Controller 25from bumble.hci import Address 26from bumble.link import LocalLink 27from bumble.transport import open_transport_or_link 28 29 30# ----------------------------------------------------------------------------- 31class ScannerListener(Device.Listener): 32 def on_advertisement(self, advertisement): 33 address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type] 34 address_color = 'yellow' if advertisement.is_connectable else 'red' 35 if address_type_string.startswith('P'): 36 type_color = 'green' 37 else: 38 type_color = 'cyan' 39 40 print( 41 f'>>> {color(advertisement.address, address_color)} ' 42 f'[{color(address_type_string, type_color)}]: ' 43 f'RSSI={advertisement.rssi}, {advertisement.data}' 44 ) 45 46 47# ----------------------------------------------------------------------------- 48async def main() -> None: 49 if len(sys.argv) != 2: 50 print('Usage: run_controller.py <transport-spec>') 51 print('example: run_controller_with_scanner.py serial:/dev/pts/14,1000000') 52 return 53 54 print('>>> connecting to HCI...') 55 async with await open_transport_or_link(sys.argv[1]) as hci_transport: 56 print('>>> connected') 57 58 # Create a local link 59 link = LocalLink() 60 61 # Create a first controller using the packet source/sink as its host interface 62 controller1 = Controller( 63 'C1', 64 host_source=hci_transport.source, 65 host_sink=hci_transport.sink, 66 link=link, 67 public_address='E0:E1:E2:E3:E4:E5', 68 ) 69 70 # Create a second controller using the same link 71 controller2 = Controller('C2', link=link) 72 73 # Create a device with a scanner listener 74 device = Device.with_hci( 75 'Bumble', Address('F0:F1:F2:F3:F4:F5'), controller2, controller2 76 ) 77 device.listener = ScannerListener() 78 await device.power_on() 79 await device.start_scanning() 80 81 await hci_transport.source.wait_for_termination() 82 83 84# ----------------------------------------------------------------------------- 85logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 86asyncio.run(main()) 87