1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import os 20import logging 21import click 22 23import bumble.core 24from bumble.colors import color 25from bumble.device import Device, Peer 26from bumble.gatt import show_services 27from bumble.transport import open_transport_or_link 28 29 30# ----------------------------------------------------------------------------- 31async def dump_gatt_db(peer, done): 32 # Discover all services 33 print(color('### Discovering Services and Characteristics', 'magenta')) 34 await peer.discover_services() 35 for service in peer.services: 36 await service.discover_characteristics() 37 for characteristic in service.characteristics: 38 await characteristic.discover_descriptors() 39 40 print(color('=== Services ===', 'yellow')) 41 show_services(peer.services) 42 print() 43 44 # Discover all attributes 45 print(color('=== All Attributes ===', 'yellow')) 46 attributes = await peer.discover_attributes() 47 for attribute in attributes: 48 print(attribute) 49 try: 50 value = await attribute.read_value() 51 print(color(f'{value.hex()}', 'green')) 52 except bumble.core.ProtocolError as error: 53 print(color(error, 'red')) 54 except bumble.core.TimeoutError: 55 print(color('read timeout', 'red')) 56 57 if done is not None: 58 done.set_result(None) 59 60 61# ----------------------------------------------------------------------------- 62async def async_main(device_config, encrypt, transport, address_or_name): 63 async with await open_transport_or_link(transport) as (hci_source, hci_sink): 64 65 # Create a device 66 if device_config: 67 device = Device.from_config_file_with_hci( 68 device_config, hci_source, hci_sink 69 ) 70 else: 71 device = Device.with_hci( 72 'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink 73 ) 74 await device.power_on() 75 76 if address_or_name: 77 # Connect to the target peer 78 print(color('>>> Connecting...', 'green')) 79 connection = await device.connect(address_or_name) 80 print(color('>>> Connected', 'green')) 81 82 # Encrypt the connection if required 83 if encrypt: 84 print(color('+++ Encrypting connection...', 'blue')) 85 await connection.encrypt() 86 print(color('+++ Encryption established', 'blue')) 87 88 await dump_gatt_db(Peer(connection), None) 89 else: 90 # Wait for a connection 91 done = asyncio.get_running_loop().create_future() 92 device.on( 93 'connection', 94 lambda connection: asyncio.create_task( 95 dump_gatt_db(Peer(connection), done) 96 ), 97 ) 98 await device.start_advertising(auto_restart=True) 99 100 print(color('### Waiting for connection...', 'blue')) 101 await done 102 103 104# ----------------------------------------------------------------------------- 105@click.command() 106@click.option('--device-config', help='Device configuration', type=click.Path()) 107@click.option('--encrypt', help='Encrypt the connection', is_flag=True, default=False) 108@click.argument('transport') 109@click.argument('address-or-name', required=False) 110def main(device_config, encrypt, transport, address_or_name): 111 """ 112 Dump the GATT database on a remote device. If ADDRESS_OR_NAME is not specified, 113 wait for an incoming connection. 114 """ 115 logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 116 asyncio.run(async_main(device_config, encrypt, transport, address_or_name)) 117 118 119# ----------------------------------------------------------------------------- 120if __name__ == '__main__': 121 main() 122