# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Bumble Tool # ----------------------------------------------------------------------------- # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import logging import os import random import re import humanize from typing import Optional, Union from collections import OrderedDict import click from prettytable import PrettyTable from prompt_toolkit import Application from prompt_toolkit.history import FileHistory from prompt_toolkit.completion import Completer, Completion, NestedCompleter from prompt_toolkit.key_binding import KeyBindings from prompt_toolkit.formatted_text import ANSI from prompt_toolkit.styles import Style from prompt_toolkit.filters import Condition from prompt_toolkit.widgets import TextArea, Frame from prompt_toolkit.widgets.toolbars import FormattedTextToolbar from prompt_toolkit.data_structures import Point from prompt_toolkit.layout import ( Layout, HSplit, Window, CompletionsMenu, Float, FormattedTextControl, FloatContainer, ConditionalContainer, Dimension, ) from bumble import __version__ import bumble.core from bumble import colors from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer from bumble.utils import AsyncRunner from bumble.transport import open_transport_or_link from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor from bumble.gatt_client import CharacteristicProxy from bumble.hci import ( Address, HCI_Constant, HCI_LE_1M_PHY, HCI_LE_2M_PHY, HCI_LE_CODED_PHY, ) # ----------------------------------------------------------------------------- # Constants # ----------------------------------------------------------------------------- BUMBLE_USER_DIR = os.path.expanduser('~/.bumble') DEFAULT_RSSI_BAR_WIDTH = 20 DEFAULT_CONNECTION_TIMEOUT = 30.0 DISPLAY_MIN_RSSI = -100 DISPLAY_MAX_RSSI = -30 RSSI_MONITOR_INTERVAL = 5.0 # Seconds # ----------------------------------------------------------------------------- # Utils # ----------------------------------------------------------------------------- def le_phy_name(phy_id): return {HCI_LE_1M_PHY: '1M', HCI_LE_2M_PHY: '2M', HCI_LE_CODED_PHY: 'CODED'}.get( phy_id, HCI_Constant.le_phy_name(phy_id) ) def rssi_bar(rssi): blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉'] bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI) bar_width = min(max(bar_width, 0), 1) bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8) bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8] return f'{rssi:4} {bar_blocks}' def parse_phys(phys): if phys.lower() == '*': return None phy_list = [] elements = phys.lower().split(',') for element in elements: if element == '1m': phy_list.append(HCI_LE_1M_PHY) elif element == '2m': phy_list.append(HCI_LE_2M_PHY) elif element == 'coded': phy_list.append(HCI_LE_CODED_PHY) else: raise ValueError('invalid PHY name') return phy_list # ----------------------------------------------------------------------------- # Console App # ----------------------------------------------------------------------------- class ConsoleApp: connected_peer: Optional[Peer] def __init__(self): self.known_addresses = set() self.known_remote_attributes = [] self.known_local_attributes = [] self.device = None self.connected_peer = None self.top_tab = 'device' self.monitor_rssi = False self.connection_rssi = None style = Style.from_dict( { 'output-field': 'bg:#000044 #ffffff', 'input-field': 'bg:#000000 #ffffff', 'line': '#004400', 'error': 'fg:ansired', } ) class LiveCompleter(Completer): def __init__(self, words): self.words = words def get_completions(self, document, complete_event): prefix = document.text_before_cursor.upper() for word in [x for x in self.words if x.upper().startswith(prefix)]: yield Completion(word, start_position=-len(prefix)) def make_completer(): return NestedCompleter.from_nested_dict( { 'scan': {'on': None, 'off': None, 'clear': None}, 'advertise': {'on': None, 'off': None}, 'rssi': {'on': None, 'off': None}, 'show': { 'scan': None, 'log': None, 'device': None, 'local-services': None, 'remote-services': None, 'local-values': None, 'remote-values': None, }, 'filter': { 'address': None, }, 'connect': LiveCompleter(self.known_addresses), 'update-parameters': None, 'encrypt': None, 'disconnect': None, 'discover': {'services': None, 'attributes': None}, 'request-mtu': None, 'read': LiveCompleter(self.known_remote_attributes), 'write': LiveCompleter(self.known_remote_attributes), 'local-write': LiveCompleter(self.known_local_attributes), 'subscribe': LiveCompleter(self.known_remote_attributes), 'unsubscribe': LiveCompleter(self.known_remote_attributes), 'set-phy': {'1m': None, '2m': None, 'coded': None}, 'set-default-phy': None, 'quit': None, 'exit': None, } ) self.input_field = TextArea( height=1, prompt="> ", multiline=False, wrap_lines=False, completer=make_completer(), history=FileHistory(os.path.join(BUMBLE_USER_DIR, 'history')), ) self.input_field.accept_handler = self.accept_input self.output_height = Dimension(min=7, max=7, weight=1) self.output_lines = [] self.output = FormattedTextControl( get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1)) ) self.output_max_lines = 20 self.scan_results_text = FormattedTextControl() self.local_services_text = FormattedTextControl() self.remote_services_text = FormattedTextControl() self.device_text = FormattedTextControl() self.log_text = FormattedTextControl( get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)) ) self.local_values_text = FormattedTextControl() self.remote_values_text = FormattedTextControl() self.log_height = Dimension(min=7, weight=4) self.log_max_lines = 100 self.log_lines = [] container = HSplit( [ ConditionalContainer( Frame(Window(self.scan_results_text), title='Scan Results'), filter=Condition(lambda: self.top_tab == 'scan'), ), ConditionalContainer( Frame(Window(self.local_services_text), title='Local Services'), filter=Condition(lambda: self.top_tab == 'local-services'), ), ConditionalContainer( Frame(Window(self.local_values_text), title='Local Values'), filter=Condition(lambda: self.top_tab == 'local-values'), ), ConditionalContainer( Frame(Window(self.remote_services_text), title='Remote Services'), filter=Condition(lambda: self.top_tab == 'remote-services'), ), ConditionalContainer( Frame(Window(self.remote_values_text), title='Remote Values'), filter=Condition(lambda: self.top_tab == 'remote-values'), ), ConditionalContainer( Frame(Window(self.log_text, height=self.log_height), title='Log'), filter=Condition(lambda: self.top_tab == 'log'), ), ConditionalContainer( Frame(Window(self.device_text), title='Device'), filter=Condition(lambda: self.top_tab == 'device'), ), Frame(Window(self.output, height=self.output_height)), FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'), self.input_field, ] ) container = FloatContainer( container, floats=[ Float( xcursor=True, ycursor=True, content=CompletionsMenu(max_height=16, scroll_offset=1), ), ], ) layout = Layout(container, focused_element=self.input_field) key_bindings = KeyBindings() @key_bindings.add("c-c") @key_bindings.add("c-q") def _(event): event.app.exit() # pylint: disable=invalid-name self.ui = Application( layout=layout, style=style, key_bindings=key_bindings, full_screen=True ) async def run_async(self, device_config, transport): rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop()) async with await open_transport_or_link(transport) as (hci_source, hci_sink): if device_config: self.device = Device.from_config_file_with_hci( device_config, hci_source, hci_sink ) else: random_address = Address.generate_static_address() self.append_to_log(f"Setting random address: {random_address}") self.device = Device.with_hci( 'Bumble', random_address, hci_source, hci_sink ) self.device.listener = DeviceListener(self) await self.device.power_on() self.show_device(self.device) self.show_local_services(self.device.gatt_server.attributes) # Run the UI await self.ui.run_async() rssi_monitoring_task.cancel() def add_known_address(self, address): self.known_addresses.add(address) def accept_input(self, _): if len(self.input_field.text) == 0: return self.append_to_output([('', '* '), ('ansicyan', self.input_field.text)], False) self.ui.create_background_task(self.command(self.input_field.text)) def get_status_bar_text(self): scanning = "ON" if self.device and self.device.is_scanning else "OFF" connection_state = 'NONE' encryption_state = '' att_mtu = '' rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi) if self.device: if self.device.is_le_connecting: connection_state = 'CONNECTING' elif self.connected_peer: connection = self.connected_peer.connection connection_parameters = ( f'{connection.parameters.connection_interval}/' f'{connection.parameters.peripheral_latency}/' f'{connection.parameters.supervision_timeout}' ) if connection.transport == BT_LE_TRANSPORT: phy_state = ( f' RX={le_phy_name(connection.phy.rx_phy)}/' f'TX={le_phy_name(connection.phy.tx_phy)}' ) else: phy_state = '' connection_state = ( f'{connection.peer_address} ' f'{connection_parameters} ' f'{connection.data_length}' f'{phy_state}' ) encryption_state = ( 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED' ) att_mtu = f'ATT_MTU: {connection.att_mtu}' return [ ('ansigreen', f' SCAN: {scanning} '), ('', ' '), ('ansiblue', f' CONNECTION: {connection_state} '), ('', ' '), ('ansimagenta', f' {encryption_state} '), ('', ' '), ('ansicyan', f' {att_mtu} '), ('', ' '), ('ansiyellow', f' {rssi} '), ] def show_error(self, title, details=None): appended = [('class:error', title)] if details: appended.append(('', f' {details}')) self.append_to_output(appended) def show_scan_results(self, scan_results): max_lines = 40 # TEMP lines = [] keys = list(scan_results.keys())[:max_lines] for key in keys: lines.append(scan_results[key].to_display_string()) self.scan_results_text.text = ANSI('\n'.join(lines)) self.ui.invalidate() def show_remote_services(self, services): lines = [] del self.known_remote_attributes[:] for service in services: lines.append(("ansicyan", f"{service}\n")) for characteristic in service.characteristics: lines.append(('ansimagenta', f' {characteristic} + \n')) self.known_remote_attributes.append( f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}' ) self.known_remote_attributes.append( f'*.{characteristic.uuid.to_hex_str()}' ) self.known_remote_attributes.append(f'#{characteristic.handle:X}') for descriptor in characteristic.descriptors: lines.append(("ansigreen", f" {descriptor}\n")) self.remote_services_text.text = lines self.ui.invalidate() def show_local_services(self, attributes): lines = [] del self.known_local_attributes[:] for attribute in attributes: if isinstance(attribute, Service): # Save the most recent service for use later service = attribute lines.append(("ansicyan", f"{attribute}\n")) elif isinstance(attribute, Characteristic): # CharacteristicDeclaration includes all info from Characteristic # no need to print it twice continue elif isinstance(attribute, CharacteristicDeclaration): # Save the most recent characteristic declaration for use later characteristic_declaration = attribute self.known_local_attributes.append( f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}' ) self.known_local_attributes.append( f'#{attribute.characteristic.handle:X}' ) lines.append(("ansimagenta", f" {attribute}\n")) elif isinstance(attribute, Descriptor): self.known_local_attributes.append( f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}' ) self.known_local_attributes.append(f'#{attribute.handle:X}') lines.append(("ansigreen", f" {attribute}\n")) else: lines.append(("ansiyellow", f"{attribute}\n")) self.local_services_text.text = lines self.ui.invalidate() def show_device(self, device): lines = [] lines.append(('ansicyan', 'Bumble Version: ')) lines.append(('', f'{__version__}\n')) lines.append(('ansicyan', 'Name: ')) lines.append(('', f'{device.name}\n')) lines.append(('ansicyan', 'Public Address: ')) lines.append(('', f'{device.public_address}\n')) lines.append(('ansicyan', 'Random Address: ')) lines.append(('', f'{device.random_address}\n')) lines.append(('ansicyan', 'LE Enabled: ')) lines.append(('', f'{device.le_enabled}\n')) lines.append(('ansicyan', 'Classic Enabled: ')) lines.append(('', f'{device.classic_enabled}\n')) lines.append(('ansicyan', 'Classic SC Enabled: ')) lines.append(('', f'{device.classic_sc_enabled}\n')) lines.append(('ansicyan', 'Classic SSP Enabled: ')) lines.append(('', f'{device.classic_ssp_enabled}\n')) lines.append(('ansicyan', 'Classic Class: ')) lines.append(('', f'{device.class_of_device}\n')) lines.append(('ansicyan', 'Discoverable: ')) lines.append(('', f'{device.discoverable}\n')) lines.append(('ansicyan', 'Connectable: ')) lines.append(('', f'{device.connectable}\n')) lines.append(('ansicyan', 'Advertising Data: ')) lines.append(('', f'{device.advertising_data}\n')) lines.append(('ansicyan', 'Scan Response Data: ')) lines.append(('', f'{device.scan_response_data}\n')) advertising_interval = ( device.advertising_interval_min if device.advertising_interval_min == device.advertising_interval_max else ( f'{device.advertising_interval_min} to ' f'{device.advertising_interval_max}' ) ) lines.append(('ansicyan', 'Advertising Interval: ')) lines.append(('', f'{advertising_interval}\n')) self.device_text.text = lines self.ui.invalidate() def append_to_output(self, line, invalidate=True): if isinstance(line, str): line = [('', line)] self.output_lines = self.output_lines[-self.output_max_lines :] self.output_lines.append(line) formatted_text = [] for line in self.output_lines: formatted_text += line formatted_text.append(('', '\n')) self.output.text = formatted_text if invalidate: self.ui.invalidate() def append_to_log(self, lines, invalidate=True): self.log_lines.extend(lines.split('\n')) self.log_lines = self.log_lines[-self.log_max_lines :] self.log_text.text = ANSI('\n'.join(self.log_lines)) if invalidate: self.ui.invalidate() async def discover_services(self): if not self.connected_peer: self.show_error('not connected') return self.append_to_output('Service Discovery starting...') await self.connected_peer.discover_all() self.append_to_output('Service Discovery done!') async def discover_attributes(self): if not self.connected_peer: self.show_error('not connected') return # Discover all attributes self.append_to_output('discovering attributes...') attributes = await self.connected_peer.discover_attributes() self.append_to_output(f'discovered {len(attributes)} attributes...') self.show_attributes(attributes) def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]: if not self.connected_peer: return None parts = param.split('.') if len(parts) == 2: service_uuid = UUID(parts[0]) if parts[0] != '*' else None characteristic_uuid = UUID(parts[1]) for service in self.connected_peer.services: if service_uuid is None or service.uuid == service_uuid: for characteristic in service.characteristics: if characteristic.uuid == characteristic_uuid: return characteristic elif len(parts) == 1: if parts[0].startswith('#'): attribute_handle = int(f'{parts[0][1:]}', 16) for service in self.connected_peer.services: for characteristic in service.characteristics: if characteristic.handle == attribute_handle: return characteristic return None def find_local_attribute( self, param ) -> Optional[Union[Characteristic, Descriptor]]: parts = param.split('.') if len(parts) == 3: service_uuid = UUID(parts[0]) characteristic_uuid = UUID(parts[1]) descriptor_uuid = UUID(parts[2]) return self.device.gatt_server.get_descriptor_attribute( service_uuid, characteristic_uuid, descriptor_uuid ) if len(parts) == 2: service_uuid = UUID(parts[0]) characteristic_uuid = UUID(parts[1]) characteristic_attributes = ( self.device.gatt_server.get_characteristic_attributes( service_uuid, characteristic_uuid ) ) if characteristic_attributes: return characteristic_attributes[1] return None elif len(parts) == 1: if parts[0].startswith('#'): attribute_handle = int(f'{parts[0][1:]}', 16) attribute = self.device.gatt_server.get_attribute(attribute_handle) if isinstance(attribute, (Characteristic, Descriptor)): return attribute return None return None async def rssi_monitor_loop(self): while True: if self.monitor_rssi and self.connected_peer: self.connection_rssi = await self.connected_peer.connection.get_rssi() await asyncio.sleep(RSSI_MONITOR_INTERVAL) async def command(self, command): try: (keyword, *params) = command.strip().split(' ') keyword = keyword.replace('-', '_').lower() handler = getattr(self, f'do_{keyword}', None) if handler: await handler(params) self.ui.invalidate() else: self.show_error('unknown command', keyword) except Exception as error: self.show_error(str(error)) async def do_scan(self, params): if len(params) == 0: # Toggle scanning if self.device.is_scanning: await self.device.stop_scanning() else: await self.device.start_scanning() elif params[0] == 'on': if len(params) == 2: if not params[1].startswith("filter="): self.show_error( 'invalid syntax', 'expected address filter=key1:value1,key2:value,... ' 'available filters: address', ) # regex: (word):(any char except ,) matches = re.findall(r"(\w+):([^,]+)", params[1]) for match in matches: if match[0] == "address": self.device.listener.address_filter = match[1] await self.device.start_scanning() self.top_tab = 'scan' elif params[0] == 'off': await self.device.stop_scanning() elif params[0] == 'clear': self.device.listener.scan_results.clear() self.known_addresses.clear() self.show_scan_results(self.device.listener.scan_results) else: self.show_error('unsupported arguments for scan command') async def do_rssi(self, params): if len(params) == 0: # Toggle monitoring self.monitor_rssi = not self.monitor_rssi elif params[0] == 'on': self.monitor_rssi = True elif params[0] == 'off': self.monitor_rssi = False else: self.show_error('unsupported arguments for rssi command') async def do_connect(self, params): if len(params) != 1 and len(params) != 2: self.show_error('invalid syntax', 'expected connect
[phys]') return if len(params) == 1: phys = None else: phys = parse_phys(params[1]) if phys is None: connection_parameters_preferences = None else: connection_parameters_preferences = { phy: ConnectionParametersPreferences() for phy in phys } if self.device.is_scanning: await self.device.stop_scanning() self.append_to_output('connecting...') try: await self.device.connect( params[0], connection_parameters_preferences=connection_parameters_preferences, timeout=DEFAULT_CONNECTION_TIMEOUT, ) self.top_tab = 'services' except bumble.core.TimeoutError: self.show_error('connection timed out') async def do_disconnect(self, _): if self.device.is_le_connecting: await self.device.cancel_connection() else: if not self.connected_peer: self.show_error('not connected') return await self.connected_peer.connection.disconnect() async def do_update_parameters(self, params): if len(params) != 1 or len(params[0].split('/')) != 3: self.show_error( 'invalid syntax', 'expected update-parameters