1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Bumble Tool 17# ----------------------------------------------------------------------------- 18 19# ----------------------------------------------------------------------------- 20# Imports 21# ----------------------------------------------------------------------------- 22import asyncio 23import logging 24import os 25import random 26import re 27import humanize 28from typing import Optional, Union 29from collections import OrderedDict 30 31import click 32from prettytable import PrettyTable 33 34from prompt_toolkit import Application 35from prompt_toolkit.history import FileHistory 36from prompt_toolkit.completion import Completer, Completion, NestedCompleter 37from prompt_toolkit.key_binding import KeyBindings 38from prompt_toolkit.formatted_text import ANSI 39from prompt_toolkit.styles import Style 40from prompt_toolkit.filters import Condition 41from prompt_toolkit.widgets import TextArea, Frame 42from prompt_toolkit.widgets.toolbars import FormattedTextToolbar 43from prompt_toolkit.data_structures import Point 44from prompt_toolkit.layout import ( 45 Layout, 46 HSplit, 47 Window, 48 CompletionsMenu, 49 Float, 50 FormattedTextControl, 51 FloatContainer, 52 ConditionalContainer, 53 Dimension, 54) 55 56from bumble import __version__ 57import bumble.core 58from bumble import colors 59from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT 60from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer 61from bumble.utils import AsyncRunner 62from bumble.transport import open_transport_or_link 63from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor 64from bumble.gatt_client import CharacteristicProxy 65from bumble.hci import ( 66 Address, 67 HCI_Constant, 68 HCI_LE_1M_PHY, 69 HCI_LE_2M_PHY, 70 HCI_LE_CODED_PHY, 71) 72 73 74# ----------------------------------------------------------------------------- 75# Constants 76# ----------------------------------------------------------------------------- 77BUMBLE_USER_DIR = os.path.expanduser('~/.bumble') 78DEFAULT_RSSI_BAR_WIDTH = 20 79DEFAULT_CONNECTION_TIMEOUT = 30.0 80DISPLAY_MIN_RSSI = -100 81DISPLAY_MAX_RSSI = -30 82RSSI_MONITOR_INTERVAL = 5.0 # Seconds 83 84 85# ----------------------------------------------------------------------------- 86# Utils 87# ----------------------------------------------------------------------------- 88 89 90def le_phy_name(phy_id): 91 return {HCI_LE_1M_PHY: '1M', HCI_LE_2M_PHY: '2M', HCI_LE_CODED_PHY: 'CODED'}.get( 92 phy_id, HCI_Constant.le_phy_name(phy_id) 93 ) 94 95 96def rssi_bar(rssi): 97 blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉'] 98 bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI) 99 bar_width = min(max(bar_width, 0), 1) 100 bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8) 101 bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8] 102 return f'{rssi:4} {bar_blocks}' 103 104 105def parse_phys(phys): 106 if phys.lower() == '*': 107 return None 108 109 phy_list = [] 110 elements = phys.lower().split(',') 111 for element in elements: 112 if element == '1m': 113 phy_list.append(HCI_LE_1M_PHY) 114 elif element == '2m': 115 phy_list.append(HCI_LE_2M_PHY) 116 elif element == 'coded': 117 phy_list.append(HCI_LE_CODED_PHY) 118 else: 119 raise ValueError('invalid PHY name') 120 return phy_list 121 122 123# ----------------------------------------------------------------------------- 124# Console App 125# ----------------------------------------------------------------------------- 126class ConsoleApp: 127 connected_peer: Optional[Peer] 128 129 def __init__(self): 130 self.known_addresses = set() 131 self.known_remote_attributes = [] 132 self.known_local_attributes = [] 133 self.device = None 134 self.connected_peer = None 135 self.top_tab = 'device' 136 self.monitor_rssi = False 137 self.connection_rssi = None 138 139 style = Style.from_dict( 140 { 141 'output-field': 'bg:#000044 #ffffff', 142 'input-field': 'bg:#000000 #ffffff', 143 'line': '#004400', 144 'error': 'fg:ansired', 145 } 146 ) 147 148 class LiveCompleter(Completer): 149 def __init__(self, words): 150 self.words = words 151 152 def get_completions(self, document, complete_event): 153 prefix = document.text_before_cursor.upper() 154 for word in [x for x in self.words if x.upper().startswith(prefix)]: 155 yield Completion(word, start_position=-len(prefix)) 156 157 def make_completer(): 158 return NestedCompleter.from_nested_dict( 159 { 160 'scan': {'on': None, 'off': None, 'clear': None}, 161 'advertise': {'on': None, 'off': None}, 162 'rssi': {'on': None, 'off': None}, 163 'show': { 164 'scan': None, 165 'log': None, 166 'device': None, 167 'local-services': None, 168 'remote-services': None, 169 'local-values': None, 170 'remote-values': None, 171 }, 172 'filter': { 173 'address': None, 174 }, 175 'connect': LiveCompleter(self.known_addresses), 176 'update-parameters': None, 177 'encrypt': None, 178 'disconnect': None, 179 'discover': {'services': None, 'attributes': None}, 180 'request-mtu': None, 181 'read': LiveCompleter(self.known_remote_attributes), 182 'write': LiveCompleter(self.known_remote_attributes), 183 'local-write': LiveCompleter(self.known_local_attributes), 184 'subscribe': LiveCompleter(self.known_remote_attributes), 185 'unsubscribe': LiveCompleter(self.known_remote_attributes), 186 'set-phy': {'1m': None, '2m': None, 'coded': None}, 187 'set-default-phy': None, 188 'quit': None, 189 'exit': None, 190 } 191 ) 192 193 self.input_field = TextArea( 194 height=1, 195 prompt="> ", 196 multiline=False, 197 wrap_lines=False, 198 completer=make_completer(), 199 history=FileHistory(os.path.join(BUMBLE_USER_DIR, 'history')), 200 ) 201 202 self.input_field.accept_handler = self.accept_input 203 204 self.output_height = Dimension(min=7, max=7, weight=1) 205 self.output_lines = [] 206 self.output = FormattedTextControl( 207 get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1)) 208 ) 209 self.output_max_lines = 20 210 self.scan_results_text = FormattedTextControl() 211 self.local_services_text = FormattedTextControl() 212 self.remote_services_text = FormattedTextControl() 213 self.device_text = FormattedTextControl() 214 self.log_text = FormattedTextControl( 215 get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)) 216 ) 217 self.local_values_text = FormattedTextControl() 218 self.remote_values_text = FormattedTextControl() 219 self.log_height = Dimension(min=7, weight=4) 220 self.log_max_lines = 100 221 self.log_lines = [] 222 223 container = HSplit( 224 [ 225 ConditionalContainer( 226 Frame(Window(self.scan_results_text), title='Scan Results'), 227 filter=Condition(lambda: self.top_tab == 'scan'), 228 ), 229 ConditionalContainer( 230 Frame(Window(self.local_services_text), title='Local Services'), 231 filter=Condition(lambda: self.top_tab == 'local-services'), 232 ), 233 ConditionalContainer( 234 Frame(Window(self.local_values_text), title='Local Values'), 235 filter=Condition(lambda: self.top_tab == 'local-values'), 236 ), 237 ConditionalContainer( 238 Frame(Window(self.remote_services_text), title='Remote Services'), 239 filter=Condition(lambda: self.top_tab == 'remote-services'), 240 ), 241 ConditionalContainer( 242 Frame(Window(self.remote_values_text), title='Remote Values'), 243 filter=Condition(lambda: self.top_tab == 'remote-values'), 244 ), 245 ConditionalContainer( 246 Frame(Window(self.log_text, height=self.log_height), title='Log'), 247 filter=Condition(lambda: self.top_tab == 'log'), 248 ), 249 ConditionalContainer( 250 Frame(Window(self.device_text), title='Device'), 251 filter=Condition(lambda: self.top_tab == 'device'), 252 ), 253 Frame(Window(self.output, height=self.output_height)), 254 FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'), 255 self.input_field, 256 ] 257 ) 258 259 container = FloatContainer( 260 container, 261 floats=[ 262 Float( 263 xcursor=True, 264 ycursor=True, 265 content=CompletionsMenu(max_height=16, scroll_offset=1), 266 ), 267 ], 268 ) 269 270 layout = Layout(container, focused_element=self.input_field) 271 272 key_bindings = KeyBindings() 273 274 @key_bindings.add("c-c") 275 @key_bindings.add("c-q") 276 def _(event): 277 event.app.exit() 278 279 # pylint: disable=invalid-name 280 self.ui = Application( 281 layout=layout, style=style, key_bindings=key_bindings, full_screen=True 282 ) 283 284 async def run_async(self, device_config, transport): 285 rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop()) 286 287 async with await open_transport_or_link(transport) as (hci_source, hci_sink): 288 if device_config: 289 self.device = Device.from_config_file_with_hci( 290 device_config, hci_source, hci_sink 291 ) 292 else: 293 random_address = Address.generate_static_address() 294 self.append_to_log(f"Setting random address: {random_address}") 295 self.device = Device.with_hci( 296 'Bumble', random_address, hci_source, hci_sink 297 ) 298 self.device.listener = DeviceListener(self) 299 await self.device.power_on() 300 self.show_device(self.device) 301 self.show_local_services(self.device.gatt_server.attributes) 302 303 # Run the UI 304 await self.ui.run_async() 305 306 rssi_monitoring_task.cancel() 307 308 def add_known_address(self, address): 309 self.known_addresses.add(address) 310 311 def accept_input(self, _): 312 if len(self.input_field.text) == 0: 313 return 314 self.append_to_output([('', '* '), ('ansicyan', self.input_field.text)], False) 315 self.ui.create_background_task(self.command(self.input_field.text)) 316 317 def get_status_bar_text(self): 318 scanning = "ON" if self.device and self.device.is_scanning else "OFF" 319 320 connection_state = 'NONE' 321 encryption_state = '' 322 att_mtu = '' 323 rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi) 324 325 if self.device: 326 if self.device.is_le_connecting: 327 connection_state = 'CONNECTING' 328 elif self.connected_peer: 329 connection = self.connected_peer.connection 330 connection_parameters = ( 331 f'{connection.parameters.connection_interval}/' 332 f'{connection.parameters.peripheral_latency}/' 333 f'{connection.parameters.supervision_timeout}' 334 ) 335 if connection.transport == BT_LE_TRANSPORT: 336 phy_state = ( 337 f' RX={le_phy_name(connection.phy.rx_phy)}/' 338 f'TX={le_phy_name(connection.phy.tx_phy)}' 339 ) 340 else: 341 phy_state = '' 342 connection_state = ( 343 f'{connection.peer_address} ' 344 f'{connection_parameters} ' 345 f'{connection.data_length}' 346 f'{phy_state}' 347 ) 348 encryption_state = ( 349 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED' 350 ) 351 att_mtu = f'ATT_MTU: {connection.att_mtu}' 352 353 return [ 354 ('ansigreen', f' SCAN: {scanning} '), 355 ('', ' '), 356 ('ansiblue', f' CONNECTION: {connection_state} '), 357 ('', ' '), 358 ('ansimagenta', f' {encryption_state} '), 359 ('', ' '), 360 ('ansicyan', f' {att_mtu} '), 361 ('', ' '), 362 ('ansiyellow', f' {rssi} '), 363 ] 364 365 def show_error(self, title, details=None): 366 appended = [('class:error', title)] 367 if details: 368 appended.append(('', f' {details}')) 369 self.append_to_output(appended) 370 371 def show_scan_results(self, scan_results): 372 max_lines = 40 # TEMP 373 lines = [] 374 keys = list(scan_results.keys())[:max_lines] 375 for key in keys: 376 lines.append(scan_results[key].to_display_string()) 377 self.scan_results_text.text = ANSI('\n'.join(lines)) 378 self.ui.invalidate() 379 380 def show_remote_services(self, services): 381 lines = [] 382 del self.known_remote_attributes[:] 383 for service in services: 384 lines.append(("ansicyan", f"{service}\n")) 385 386 for characteristic in service.characteristics: 387 lines.append(('ansimagenta', f' {characteristic} + \n')) 388 self.known_remote_attributes.append( 389 f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}' 390 ) 391 self.known_remote_attributes.append( 392 f'*.{characteristic.uuid.to_hex_str()}' 393 ) 394 self.known_remote_attributes.append(f'#{characteristic.handle:X}') 395 for descriptor in characteristic.descriptors: 396 lines.append(("ansigreen", f" {descriptor}\n")) 397 398 self.remote_services_text.text = lines 399 self.ui.invalidate() 400 401 def show_local_services(self, attributes): 402 lines = [] 403 del self.known_local_attributes[:] 404 for attribute in attributes: 405 if isinstance(attribute, Service): 406 # Save the most recent service for use later 407 service = attribute 408 lines.append(("ansicyan", f"{attribute}\n")) 409 elif isinstance(attribute, Characteristic): 410 # CharacteristicDeclaration includes all info from Characteristic 411 # no need to print it twice 412 continue 413 elif isinstance(attribute, CharacteristicDeclaration): 414 # Save the most recent characteristic declaration for use later 415 characteristic_declaration = attribute 416 self.known_local_attributes.append( 417 f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}' 418 ) 419 self.known_local_attributes.append( 420 f'#{attribute.characteristic.handle:X}' 421 ) 422 lines.append(("ansimagenta", f" {attribute}\n")) 423 elif isinstance(attribute, Descriptor): 424 self.known_local_attributes.append( 425 f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}' 426 ) 427 self.known_local_attributes.append(f'#{attribute.handle:X}') 428 lines.append(("ansigreen", f" {attribute}\n")) 429 else: 430 lines.append(("ansiyellow", f"{attribute}\n")) 431 432 self.local_services_text.text = lines 433 self.ui.invalidate() 434 435 def show_device(self, device): 436 lines = [] 437 438 lines.append(('ansicyan', 'Bumble Version: ')) 439 lines.append(('', f'{__version__}\n')) 440 lines.append(('ansicyan', 'Name: ')) 441 lines.append(('', f'{device.name}\n')) 442 lines.append(('ansicyan', 'Public Address: ')) 443 lines.append(('', f'{device.public_address}\n')) 444 lines.append(('ansicyan', 'Random Address: ')) 445 lines.append(('', f'{device.random_address}\n')) 446 lines.append(('ansicyan', 'LE Enabled: ')) 447 lines.append(('', f'{device.le_enabled}\n')) 448 lines.append(('ansicyan', 'Classic Enabled: ')) 449 lines.append(('', f'{device.classic_enabled}\n')) 450 lines.append(('ansicyan', 'Classic SC Enabled: ')) 451 lines.append(('', f'{device.classic_sc_enabled}\n')) 452 lines.append(('ansicyan', 'Classic SSP Enabled: ')) 453 lines.append(('', f'{device.classic_ssp_enabled}\n')) 454 lines.append(('ansicyan', 'Classic Class: ')) 455 lines.append(('', f'{device.class_of_device}\n')) 456 lines.append(('ansicyan', 'Discoverable: ')) 457 lines.append(('', f'{device.discoverable}\n')) 458 lines.append(('ansicyan', 'Connectable: ')) 459 lines.append(('', f'{device.connectable}\n')) 460 lines.append(('ansicyan', 'Advertising Data: ')) 461 lines.append(('', f'{device.advertising_data}\n')) 462 lines.append(('ansicyan', 'Scan Response Data: ')) 463 lines.append(('', f'{device.scan_response_data}\n')) 464 advertising_interval = ( 465 device.advertising_interval_min 466 if device.advertising_interval_min == device.advertising_interval_max 467 else ( 468 f'{device.advertising_interval_min} to ' 469 f'{device.advertising_interval_max}' 470 ) 471 ) 472 lines.append(('ansicyan', 'Advertising Interval: ')) 473 lines.append(('', f'{advertising_interval}\n')) 474 475 self.device_text.text = lines 476 self.ui.invalidate() 477 478 def append_to_output(self, line, invalidate=True): 479 if isinstance(line, str): 480 line = [('', line)] 481 self.output_lines = self.output_lines[-self.output_max_lines :] 482 self.output_lines.append(line) 483 formatted_text = [] 484 for line in self.output_lines: 485 formatted_text += line 486 formatted_text.append(('', '\n')) 487 self.output.text = formatted_text 488 if invalidate: 489 self.ui.invalidate() 490 491 def append_to_log(self, lines, invalidate=True): 492 self.log_lines.extend(lines.split('\n')) 493 self.log_lines = self.log_lines[-self.log_max_lines :] 494 self.log_text.text = ANSI('\n'.join(self.log_lines)) 495 if invalidate: 496 self.ui.invalidate() 497 498 async def discover_services(self): 499 if not self.connected_peer: 500 self.show_error('not connected') 501 return 502 503 self.append_to_output('Service Discovery starting...') 504 await self.connected_peer.discover_all() 505 self.append_to_output('Service Discovery done!') 506 507 async def discover_attributes(self): 508 if not self.connected_peer: 509 self.show_error('not connected') 510 return 511 512 # Discover all attributes 513 self.append_to_output('discovering attributes...') 514 attributes = await self.connected_peer.discover_attributes() 515 self.append_to_output(f'discovered {len(attributes)} attributes...') 516 517 self.show_attributes(attributes) 518 519 def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]: 520 if not self.connected_peer: 521 return None 522 parts = param.split('.') 523 if len(parts) == 2: 524 service_uuid = UUID(parts[0]) if parts[0] != '*' else None 525 characteristic_uuid = UUID(parts[1]) 526 for service in self.connected_peer.services: 527 if service_uuid is None or service.uuid == service_uuid: 528 for characteristic in service.characteristics: 529 if characteristic.uuid == characteristic_uuid: 530 return characteristic 531 elif len(parts) == 1: 532 if parts[0].startswith('#'): 533 attribute_handle = int(f'{parts[0][1:]}', 16) 534 for service in self.connected_peer.services: 535 for characteristic in service.characteristics: 536 if characteristic.handle == attribute_handle: 537 return characteristic 538 539 return None 540 541 def find_local_attribute( 542 self, param 543 ) -> Optional[Union[Characteristic, Descriptor]]: 544 parts = param.split('.') 545 if len(parts) == 3: 546 service_uuid = UUID(parts[0]) 547 characteristic_uuid = UUID(parts[1]) 548 descriptor_uuid = UUID(parts[2]) 549 return self.device.gatt_server.get_descriptor_attribute( 550 service_uuid, characteristic_uuid, descriptor_uuid 551 ) 552 if len(parts) == 2: 553 service_uuid = UUID(parts[0]) 554 characteristic_uuid = UUID(parts[1]) 555 characteristic_attributes = ( 556 self.device.gatt_server.get_characteristic_attributes( 557 service_uuid, characteristic_uuid 558 ) 559 ) 560 if characteristic_attributes: 561 return characteristic_attributes[1] 562 return None 563 elif len(parts) == 1: 564 if parts[0].startswith('#'): 565 attribute_handle = int(f'{parts[0][1:]}', 16) 566 attribute = self.device.gatt_server.get_attribute(attribute_handle) 567 if isinstance(attribute, (Characteristic, Descriptor)): 568 return attribute 569 return None 570 571 return None 572 573 async def rssi_monitor_loop(self): 574 while True: 575 if self.monitor_rssi and self.connected_peer: 576 self.connection_rssi = await self.connected_peer.connection.get_rssi() 577 await asyncio.sleep(RSSI_MONITOR_INTERVAL) 578 579 async def command(self, command): 580 try: 581 (keyword, *params) = command.strip().split(' ') 582 keyword = keyword.replace('-', '_').lower() 583 handler = getattr(self, f'do_{keyword}', None) 584 if handler: 585 await handler(params) 586 self.ui.invalidate() 587 else: 588 self.show_error('unknown command', keyword) 589 except Exception as error: 590 self.show_error(str(error)) 591 592 async def do_scan(self, params): 593 if len(params) == 0: 594 # Toggle scanning 595 if self.device.is_scanning: 596 await self.device.stop_scanning() 597 else: 598 await self.device.start_scanning() 599 elif params[0] == 'on': 600 if len(params) == 2: 601 if not params[1].startswith("filter="): 602 self.show_error( 603 'invalid syntax', 604 'expected address filter=key1:value1,key2:value,... ' 605 'available filters: address', 606 ) 607 # regex: (word):(any char except ,) 608 matches = re.findall(r"(\w+):([^,]+)", params[1]) 609 for match in matches: 610 if match[0] == "address": 611 self.device.listener.address_filter = match[1] 612 613 await self.device.start_scanning() 614 self.top_tab = 'scan' 615 elif params[0] == 'off': 616 await self.device.stop_scanning() 617 elif params[0] == 'clear': 618 self.device.listener.scan_results.clear() 619 self.known_addresses.clear() 620 self.show_scan_results(self.device.listener.scan_results) 621 else: 622 self.show_error('unsupported arguments for scan command') 623 624 async def do_rssi(self, params): 625 if len(params) == 0: 626 # Toggle monitoring 627 self.monitor_rssi = not self.monitor_rssi 628 elif params[0] == 'on': 629 self.monitor_rssi = True 630 elif params[0] == 'off': 631 self.monitor_rssi = False 632 else: 633 self.show_error('unsupported arguments for rssi command') 634 635 async def do_connect(self, params): 636 if len(params) != 1 and len(params) != 2: 637 self.show_error('invalid syntax', 'expected connect <address> [phys]') 638 return 639 640 if len(params) == 1: 641 phys = None 642 else: 643 phys = parse_phys(params[1]) 644 if phys is None: 645 connection_parameters_preferences = None 646 else: 647 connection_parameters_preferences = { 648 phy: ConnectionParametersPreferences() for phy in phys 649 } 650 651 if self.device.is_scanning: 652 await self.device.stop_scanning() 653 654 self.append_to_output('connecting...') 655 656 try: 657 await self.device.connect( 658 params[0], 659 connection_parameters_preferences=connection_parameters_preferences, 660 timeout=DEFAULT_CONNECTION_TIMEOUT, 661 ) 662 self.top_tab = 'services' 663 except bumble.core.TimeoutError: 664 self.show_error('connection timed out') 665 666 async def do_disconnect(self, _): 667 if self.device.is_le_connecting: 668 await self.device.cancel_connection() 669 else: 670 if not self.connected_peer: 671 self.show_error('not connected') 672 return 673 674 await self.connected_peer.connection.disconnect() 675 676 async def do_update_parameters(self, params): 677 if len(params) != 1 or len(params[0].split('/')) != 3: 678 self.show_error( 679 'invalid syntax', 680 'expected update-parameters <interval-min>-<interval-max>' 681 '/<max-latency>/<supervision>', 682 ) 683 return 684 685 if not self.connected_peer: 686 self.show_error('not connected') 687 return 688 689 connection_intervals, max_latency, supervision_timeout = params[0].split('/') 690 connection_interval_min, connection_interval_max = [ 691 int(x) for x in connection_intervals.split('-') 692 ] 693 max_latency = int(max_latency) 694 supervision_timeout = int(supervision_timeout) 695 await self.connected_peer.connection.update_parameters( 696 connection_interval_min, 697 connection_interval_max, 698 max_latency, 699 supervision_timeout, 700 ) 701 702 async def do_encrypt(self, _): 703 if not self.connected_peer: 704 self.show_error('not connected') 705 return 706 707 await self.connected_peer.connection.encrypt() 708 709 async def do_advertise(self, params): 710 if len(params) == 0: 711 # Toggle advertising 712 if self.device.is_advertising: 713 await self.device.stop_advertising() 714 else: 715 await self.device.start_advertising() 716 elif params[0] == 'on': 717 await self.device.start_advertising() 718 elif params[0] == 'off': 719 await self.device.stop_advertising() 720 else: 721 self.show_error('unsupported arguments for advertise command') 722 723 async def do_show(self, params): 724 if params: 725 if params[0] in { 726 'scan', 727 'log', 728 'device', 729 'local-services', 730 'remote-services', 731 'local-values', 732 'remote-values', 733 }: 734 self.top_tab = params[0] 735 self.ui.invalidate() 736 737 while self.top_tab == 'local-values': 738 await self.do_show_local_values() 739 await asyncio.sleep(1) 740 741 while self.top_tab == 'remote-values': 742 await self.do_show_remote_values() 743 await asyncio.sleep(1) 744 745 async def do_show_local_values(self): 746 prettytable = PrettyTable() 747 field_names = ["Service", "Characteristic", "Descriptor"] 748 749 # if there's no connections, add a column just for value 750 if not self.device.connections: 751 field_names.append("Value") 752 753 # if there are connections, add a column for each connection's value 754 for connection in self.device.connections.values(): 755 field_names.append(f"Connection {connection.handle}") 756 757 for attribute in self.device.gatt_server.attributes: 758 if isinstance(attribute, Characteristic): 759 service = self.device.gatt_server.get_attribute_group( 760 attribute.handle, Service 761 ) 762 if not service: 763 continue 764 values = [ 765 await attribute.read_value(connection) 766 for connection in self.device.connections.values() 767 ] 768 if not values: 769 values = [attribute.read_value(None)] 770 prettytable.add_row([f"{service.uuid}", attribute.uuid, ""] + values) 771 772 elif isinstance(attribute, Descriptor): 773 service = self.device.gatt_server.get_attribute_group( 774 attribute.handle, Service 775 ) 776 if not service: 777 continue 778 characteristic = self.device.gatt_server.get_attribute_group( 779 attribute.handle, Characteristic 780 ) 781 if not characteristic: 782 continue 783 values = [ 784 await attribute.read_value(connection) 785 for connection in self.device.connections.values() 786 ] 787 if not values: 788 values = [await attribute.read_value(None)] 789 790 # TODO: future optimization: convert CCCD value to human readable string 791 792 prettytable.add_row( 793 [service.uuid, characteristic.uuid, attribute.type] + values 794 ) 795 796 prettytable.field_names = field_names 797 self.local_values_text.text = prettytable.get_string() 798 self.ui.invalidate() 799 800 async def do_show_remote_values(self): 801 prettytable = PrettyTable( 802 field_names=[ 803 "Connection", 804 "Service", 805 "Characteristic", 806 "Descriptor", 807 "Time", 808 "Value", 809 ] 810 ) 811 for connection in self.device.connections.values(): 812 for handle, (time, value) in connection.gatt_client.cached_values.items(): 813 row = [connection.handle] 814 attribute = connection.gatt_client.get_attributes(handle) 815 if not attribute: 816 continue 817 if len(attribute) == 3: 818 row.extend( 819 [attribute[0].uuid, attribute[1].uuid, attribute[2].type] 820 ) 821 elif len(attribute) == 2: 822 row.extend([attribute[0].uuid, attribute[1].uuid, ""]) 823 elif len(attribute) == 1: 824 row.extend([attribute[0].uuid, "", ""]) 825 else: 826 continue 827 828 row.extend([humanize.naturaltime(time), value]) 829 prettytable.add_row(row) 830 831 self.remote_values_text.text = prettytable.get_string() 832 self.ui.invalidate() 833 834 async def do_get_phy(self, _): 835 if not self.connected_peer: 836 self.show_error('not connected') 837 return 838 839 phy = await self.connected_peer.connection.get_phy() 840 self.append_to_output( 841 f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, ' 842 f'TX={HCI_Constant.le_phy_name(phy[1])}' 843 ) 844 845 async def do_request_mtu(self, params): 846 if len(params) != 1: 847 self.show_error('invalid syntax', 'expected request-mtu <mtu>') 848 return 849 850 if not self.connected_peer: 851 self.show_error('not connected') 852 return 853 854 await self.connected_peer.request_mtu(int(params[0])) 855 856 async def do_discover(self, params): 857 if not params: 858 self.show_error('invalid syntax', 'expected discover services|attributes') 859 return 860 861 discovery_type = params[0] 862 if discovery_type == 'services': 863 await self.discover_services() 864 elif discovery_type == 'attributes': 865 await self.discover_attributes() 866 867 async def do_read(self, params): 868 if len(params) != 1: 869 self.show_error('invalid syntax', 'expected read <attribute>') 870 return 871 872 if not self.connected_peer: 873 self.show_error('not connected') 874 return 875 876 characteristic = self.find_remote_characteristic(params[0]) 877 if characteristic is None: 878 self.show_error('no such characteristic') 879 return 880 881 value = await characteristic.read_value() 882 self.append_to_output(f'VALUE: 0x{value.hex()}') 883 884 async def do_write(self, params): 885 if not self.connected_peer: 886 self.show_error('not connected') 887 return 888 889 if len(params) != 2: 890 self.show_error('invalid syntax', 'expected write <attribute> <value>') 891 return 892 893 if params[1].upper().startswith("0X"): 894 value = bytes.fromhex(params[1][2:]) # parse as hex string 895 else: 896 try: 897 value = int(params[1]) # try as integer 898 except ValueError: 899 value = str.encode(params[1]) # must be a string 900 901 characteristic = self.find_remote_characteristic(params[0]) 902 if characteristic is None: 903 self.show_error('no such characteristic') 904 return 905 906 # use write with response if supported 907 with_response = characteristic.properties & Characteristic.Properties.WRITE 908 await characteristic.write_value(value, with_response=with_response) 909 910 async def do_local_write(self, params): 911 if len(params) != 2: 912 self.show_error( 913 'invalid syntax', 'expected local-write <attribute> <value>' 914 ) 915 return 916 917 if params[1].upper().startswith("0X"): 918 value = bytes.fromhex(params[1][2:]) # parse as hex string 919 else: 920 try: 921 value = int(params[1]).to_bytes(2, "little") # try as 2 byte integer 922 except ValueError: 923 value = str.encode(params[1]) # must be a string 924 925 attribute = self.find_local_attribute(params[0]) 926 if not attribute: 927 self.show_error('invalid syntax', 'unable to find attribute') 928 return 929 930 # send data to any subscribers 931 if isinstance(attribute, Characteristic): 932 await attribute.write_value(None, value) 933 if attribute.has_properties(Characteristic.NOTIFY): 934 await self.device.gatt_server.notify_subscribers(attribute) 935 if attribute.has_properties(Characteristic.INDICATE): 936 await self.device.gatt_server.indicate_subscribers(attribute) 937 938 async def do_subscribe(self, params): 939 if not self.connected_peer: 940 self.show_error('not connected') 941 return 942 943 if len(params) != 1: 944 self.show_error('invalid syntax', 'expected subscribe <attribute>') 945 return 946 947 characteristic = self.find_remote_characteristic(params[0]) 948 if characteristic is None: 949 self.show_error('no such characteristic') 950 return 951 952 await characteristic.subscribe( 953 lambda value: self.append_to_output( 954 f"{characteristic} VALUE: 0x{value.hex()}" 955 ), 956 ) 957 958 async def do_unsubscribe(self, params): 959 if not self.connected_peer: 960 self.show_error('not connected') 961 return 962 963 if len(params) != 1: 964 self.show_error('invalid syntax', 'expected subscribe <attribute>') 965 return 966 967 characteristic = self.find_remote_characteristic(params[0]) 968 if characteristic is None: 969 self.show_error('no such characteristic') 970 return 971 972 await characteristic.unsubscribe() 973 974 async def do_set_phy(self, params): 975 if len(params) != 1: 976 self.show_error( 977 'invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>' 978 ) 979 return 980 981 if not self.connected_peer: 982 self.show_error('not connected') 983 return 984 985 if '/' in params[0]: 986 tx_phys, rx_phys = params[0].split('/') 987 else: 988 tx_phys = params[0] 989 rx_phys = tx_phys 990 991 await self.connected_peer.connection.set_phy( 992 tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys) 993 ) 994 995 async def do_set_default_phy(self, params): 996 if len(params) != 1: 997 self.show_error( 998 'invalid syntax', 999 'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>', 1000 ) 1001 return 1002 1003 if '/' in params[0]: 1004 tx_phys, rx_phys = params[0].split('/') 1005 else: 1006 tx_phys = params[0] 1007 rx_phys = tx_phys 1008 1009 await self.device.set_default_phy( 1010 tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys) 1011 ) 1012 1013 async def do_exit(self, _): 1014 self.ui.exit() 1015 1016 async def do_quit(self, _): 1017 self.ui.exit() 1018 1019 async def do_filter(self, params): 1020 if params[0] == "address": 1021 if len(params) != 2: 1022 self.show_error('invalid syntax', 'expected filter address <pattern>') 1023 return 1024 self.device.listener.address_filter = params[1] 1025 1026 1027# ----------------------------------------------------------------------------- 1028# Device and Connection Listener 1029# ----------------------------------------------------------------------------- 1030class DeviceListener(Device.Listener, Connection.Listener): 1031 def __init__(self, app): 1032 self.app = app 1033 self.scan_results = OrderedDict() 1034 self.address_filter = None 1035 1036 @property 1037 def address_filter(self): 1038 return self._address_filter 1039 1040 @address_filter.setter 1041 def address_filter(self, filter_addr): 1042 if filter_addr is None: 1043 self._address_filter = re.compile(r".*") 1044 else: 1045 self._address_filter = re.compile(filter_addr) 1046 self.scan_results = OrderedDict( 1047 filter(self.filter_address_match, self.scan_results) 1048 ) 1049 self.app.show_scan_results(self.scan_results) 1050 1051 def filter_address_match(self, address): 1052 """ 1053 Returns true if an address matches the filter 1054 """ 1055 return bool(self.address_filter.match(address)) 1056 1057 @AsyncRunner.run_in_task() 1058 # pylint: disable=invalid-overridden-method 1059 async def on_connection(self, connection): 1060 self.app.connected_peer = Peer(connection) 1061 self.app.connection_rssi = None 1062 self.app.append_to_output(f'connected to {self.app.connected_peer}') 1063 connection.listener = self 1064 1065 def on_disconnection(self, reason): 1066 self.app.append_to_output( 1067 f'disconnected from {self.app.connected_peer}, ' 1068 f'reason: {HCI_Constant.error_name(reason)}' 1069 ) 1070 self.app.connected_peer = None 1071 self.app.connection_rssi = None 1072 1073 def on_connection_parameters_update(self): 1074 self.app.append_to_output( 1075 f'connection parameters update: ' 1076 f'{self.app.connected_peer.connection.parameters}' 1077 ) 1078 1079 def on_connection_phy_update(self): 1080 self.app.append_to_output( 1081 f'connection phy update: {self.app.connected_peer.connection.phy}' 1082 ) 1083 1084 def on_connection_att_mtu_update(self): 1085 self.app.append_to_output( 1086 f'connection att mtu update: {self.app.connected_peer.connection.att_mtu}' 1087 ) 1088 1089 def on_connection_encryption_change(self): 1090 encryption_state = ( 1091 'encrypted' 1092 if self.app.connected_peer.connection.is_encrypted 1093 else 'not encrypted' 1094 ) 1095 self.app.append_to_output( 1096 'connection encryption change: ' f'{encryption_state}' 1097 ) 1098 1099 def on_connection_data_length_change(self): 1100 self.app.append_to_output( 1101 'connection data length change: ' 1102 f'{self.app.connected_peer.connection.data_length}' 1103 ) 1104 1105 def on_advertisement(self, advertisement): 1106 if not self.filter_address_match(str(advertisement.address)): 1107 return 1108 1109 entry_key = f'{advertisement.address}/{advertisement.address.address_type}' 1110 entry = self.scan_results.get(entry_key) 1111 if entry: 1112 entry.ad_data = advertisement.data 1113 entry.rssi = advertisement.rssi 1114 entry.connectable = advertisement.is_connectable 1115 else: 1116 self.app.add_known_address(str(advertisement.address)) 1117 self.scan_results[entry_key] = ScanResult( 1118 advertisement.address, 1119 advertisement.address.address_type, 1120 advertisement.data, 1121 advertisement.rssi, 1122 advertisement.is_connectable, 1123 ) 1124 1125 self.app.show_scan_results(self.scan_results) 1126 1127 1128# ----------------------------------------------------------------------------- 1129# Scanning 1130# ----------------------------------------------------------------------------- 1131class ScanResult: 1132 def __init__(self, address, address_type, ad_data, rssi, connectable): 1133 self.address = address 1134 self.address_type = address_type 1135 self.ad_data = ad_data 1136 self.rssi = rssi 1137 self.connectable = connectable 1138 1139 def to_display_string(self): 1140 address_type_string = ('P', 'R', 'PI', 'RI')[self.address_type] 1141 address_color = colors.yellow if self.connectable else colors.red 1142 if address_type_string.startswith('P'): 1143 type_color = colors.green 1144 else: 1145 type_color = colors.cyan 1146 1147 name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) 1148 if name is None: 1149 name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True) 1150 if name: 1151 # Convert to string 1152 try: 1153 name = name.decode() 1154 except UnicodeDecodeError: 1155 name = name.hex() 1156 else: 1157 name = '' 1158 1159 # Remove any '/P' qualifier suffix from the address string 1160 address_str = self.address.to_string(with_type_qualifier=False) 1161 1162 # RSSI bar 1163 bar_string = rssi_bar(self.rssi) 1164 bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string)) 1165 return ( 1166 f'{address_color(address_str)} [{type_color(address_type_string)}] ' 1167 f'{bar_string} {bar_padding} {name}' 1168 ) 1169 1170 1171# ----------------------------------------------------------------------------- 1172# Logging 1173# ----------------------------------------------------------------------------- 1174class LogHandler(logging.Handler): 1175 def __init__(self, app): 1176 super().__init__() 1177 self.app = app 1178 self.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s')) 1179 1180 def emit(self, record): 1181 message = self.format(record) 1182 self.app.append_to_log(message) 1183 1184 1185# ----------------------------------------------------------------------------- 1186# Main 1187# ----------------------------------------------------------------------------- 1188@click.command() 1189@click.option('--device-config', help='Device configuration file') 1190@click.argument('transport') 1191def main(device_config, transport): 1192 # Ensure that the BUMBLE_USER_DIR directory exists 1193 if not os.path.isdir(BUMBLE_USER_DIR): 1194 os.mkdir(BUMBLE_USER_DIR) 1195 1196 # Create an instance of the app 1197 app = ConsoleApp() 1198 1199 # Setup logging 1200 # logging.basicConfig(level = 'FATAL') 1201 # logging.basicConfig(level = 'DEBUG') 1202 root_logger = logging.getLogger() 1203 1204 root_logger.addHandler(LogHandler(app)) 1205 root_logger.setLevel(logging.DEBUG) 1206 1207 # Run until the user exits 1208 asyncio.run(app.run_async(device_config, transport)) 1209 1210 1211# ----------------------------------------------------------------------------- 1212if __name__ == "__main__": 1213 main() # pylint: disable=no-value-for-parameter 1214