1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import logging
21import click
22
23import bumble.core
24from bumble.colors import color
25from bumble.device import Device, Peer
26from bumble.gatt import show_services
27from bumble.transport import open_transport_or_link
28
29
30# -----------------------------------------------------------------------------
31async def dump_gatt_db(peer, done):
32    # Discover all services
33    print(color('### Discovering Services and Characteristics', 'magenta'))
34    await peer.discover_services()
35    for service in peer.services:
36        await service.discover_characteristics()
37        for characteristic in service.characteristics:
38            await characteristic.discover_descriptors()
39
40    print(color('=== Services ===', 'yellow'))
41    show_services(peer.services)
42    print()
43
44    # Discover all attributes
45    print(color('=== All Attributes ===', 'yellow'))
46    attributes = await peer.discover_attributes()
47    for attribute in attributes:
48        print(attribute)
49        try:
50            value = await attribute.read_value()
51            print(color(f'{value.hex()}', 'green'))
52        except bumble.core.ProtocolError as error:
53            print(color(error, 'red'))
54        except bumble.core.TimeoutError:
55            print(color('read timeout', 'red'))
56
57    if done is not None:
58        done.set_result(None)
59
60
61# -----------------------------------------------------------------------------
62async def async_main(device_config, encrypt, transport, address_or_name):
63    async with await open_transport_or_link(transport) as (hci_source, hci_sink):
64
65        # Create a device
66        if device_config:
67            device = Device.from_config_file_with_hci(
68                device_config, hci_source, hci_sink
69            )
70        else:
71            device = Device.with_hci(
72                'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink
73            )
74        await device.power_on()
75
76        if address_or_name:
77            # Connect to the target peer
78            print(color('>>> Connecting...', 'green'))
79            connection = await device.connect(address_or_name)
80            print(color('>>> Connected', 'green'))
81
82            # Encrypt the connection if required
83            if encrypt:
84                print(color('+++ Encrypting connection...', 'blue'))
85                await connection.encrypt()
86                print(color('+++ Encryption established', 'blue'))
87
88            await dump_gatt_db(Peer(connection), None)
89        else:
90            # Wait for a connection
91            done = asyncio.get_running_loop().create_future()
92            device.on(
93                'connection',
94                lambda connection: asyncio.create_task(
95                    dump_gatt_db(Peer(connection), done)
96                ),
97            )
98            await device.start_advertising(auto_restart=True)
99
100            print(color('### Waiting for connection...', 'blue'))
101            await done
102
103
104# -----------------------------------------------------------------------------
105@click.command()
106@click.option('--device-config', help='Device configuration', type=click.Path())
107@click.option('--encrypt', help='Encrypt the connection', is_flag=True, default=False)
108@click.argument('transport')
109@click.argument('address-or-name', required=False)
110def main(device_config, encrypt, transport, address_or_name):
111    """
112    Dump the GATT database on a remote device. If ADDRESS_OR_NAME is not specified,
113    wait for an incoming connection.
114    """
115    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
116    asyncio.run(async_main(device_config, encrypt, transport, address_or_name))
117
118
119# -----------------------------------------------------------------------------
120if __name__ == '__main__':
121    main()
122