1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Bumble Tool
17# -----------------------------------------------------------------------------
18
19# -----------------------------------------------------------------------------
20# Imports
21# -----------------------------------------------------------------------------
22import asyncio
23import logging
24import os
25import random
26import re
27import humanize
28from typing import Optional, Union
29from collections import OrderedDict
30
31import click
32from prettytable import PrettyTable
33
34from prompt_toolkit import Application
35from prompt_toolkit.history import FileHistory
36from prompt_toolkit.completion import Completer, Completion, NestedCompleter
37from prompt_toolkit.key_binding import KeyBindings
38from prompt_toolkit.formatted_text import ANSI
39from prompt_toolkit.styles import Style
40from prompt_toolkit.filters import Condition
41from prompt_toolkit.widgets import TextArea, Frame
42from prompt_toolkit.widgets.toolbars import FormattedTextToolbar
43from prompt_toolkit.data_structures import Point
44from prompt_toolkit.layout import (
45    Layout,
46    HSplit,
47    Window,
48    CompletionsMenu,
49    Float,
50    FormattedTextControl,
51    FloatContainer,
52    ConditionalContainer,
53    Dimension,
54)
55
56from bumble import __version__
57import bumble.core
58from bumble import colors
59from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT
60from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
61from bumble.utils import AsyncRunner
62from bumble.transport import open_transport_or_link
63from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
64from bumble.gatt_client import CharacteristicProxy
65from bumble.hci import (
66    Address,
67    HCI_Constant,
68    HCI_LE_1M_PHY,
69    HCI_LE_2M_PHY,
70    HCI_LE_CODED_PHY,
71)
72
73
74# -----------------------------------------------------------------------------
75# Constants
76# -----------------------------------------------------------------------------
77BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
78DEFAULT_RSSI_BAR_WIDTH = 20
79DEFAULT_CONNECTION_TIMEOUT = 30.0
80DISPLAY_MIN_RSSI = -100
81DISPLAY_MAX_RSSI = -30
82RSSI_MONITOR_INTERVAL = 5.0  # Seconds
83
84
85# -----------------------------------------------------------------------------
86# Utils
87# -----------------------------------------------------------------------------
88
89
90def le_phy_name(phy_id):
91    return {HCI_LE_1M_PHY: '1M', HCI_LE_2M_PHY: '2M', HCI_LE_CODED_PHY: 'CODED'}.get(
92        phy_id, HCI_Constant.le_phy_name(phy_id)
93    )
94
95
96def rssi_bar(rssi):
97    blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
98    bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
99    bar_width = min(max(bar_width, 0), 1)
100    bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
101    bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
102    return f'{rssi:4} {bar_blocks}'
103
104
105def parse_phys(phys):
106    if phys.lower() == '*':
107        return None
108
109    phy_list = []
110    elements = phys.lower().split(',')
111    for element in elements:
112        if element == '1m':
113            phy_list.append(HCI_LE_1M_PHY)
114        elif element == '2m':
115            phy_list.append(HCI_LE_2M_PHY)
116        elif element == 'coded':
117            phy_list.append(HCI_LE_CODED_PHY)
118        else:
119            raise ValueError('invalid PHY name')
120    return phy_list
121
122
123# -----------------------------------------------------------------------------
124# Console App
125# -----------------------------------------------------------------------------
126class ConsoleApp:
127    connected_peer: Optional[Peer]
128
129    def __init__(self):
130        self.known_addresses = set()
131        self.known_remote_attributes = []
132        self.known_local_attributes = []
133        self.device = None
134        self.connected_peer = None
135        self.top_tab = 'device'
136        self.monitor_rssi = False
137        self.connection_rssi = None
138
139        style = Style.from_dict(
140            {
141                'output-field': 'bg:#000044 #ffffff',
142                'input-field': 'bg:#000000 #ffffff',
143                'line': '#004400',
144                'error': 'fg:ansired',
145            }
146        )
147
148        class LiveCompleter(Completer):
149            def __init__(self, words):
150                self.words = words
151
152            def get_completions(self, document, complete_event):
153                prefix = document.text_before_cursor.upper()
154                for word in [x for x in self.words if x.upper().startswith(prefix)]:
155                    yield Completion(word, start_position=-len(prefix))
156
157        def make_completer():
158            return NestedCompleter.from_nested_dict(
159                {
160                    'scan': {'on': None, 'off': None, 'clear': None},
161                    'advertise': {'on': None, 'off': None},
162                    'rssi': {'on': None, 'off': None},
163                    'show': {
164                        'scan': None,
165                        'log': None,
166                        'device': None,
167                        'local-services': None,
168                        'remote-services': None,
169                        'local-values': None,
170                        'remote-values': None,
171                    },
172                    'filter': {
173                        'address': None,
174                    },
175                    'connect': LiveCompleter(self.known_addresses),
176                    'update-parameters': None,
177                    'encrypt': None,
178                    'disconnect': None,
179                    'discover': {'services': None, 'attributes': None},
180                    'request-mtu': None,
181                    'read': LiveCompleter(self.known_remote_attributes),
182                    'write': LiveCompleter(self.known_remote_attributes),
183                    'local-write': LiveCompleter(self.known_local_attributes),
184                    'subscribe': LiveCompleter(self.known_remote_attributes),
185                    'unsubscribe': LiveCompleter(self.known_remote_attributes),
186                    'set-phy': {'1m': None, '2m': None, 'coded': None},
187                    'set-default-phy': None,
188                    'quit': None,
189                    'exit': None,
190                }
191            )
192
193        self.input_field = TextArea(
194            height=1,
195            prompt="> ",
196            multiline=False,
197            wrap_lines=False,
198            completer=make_completer(),
199            history=FileHistory(os.path.join(BUMBLE_USER_DIR, 'history')),
200        )
201
202        self.input_field.accept_handler = self.accept_input
203
204        self.output_height = Dimension(min=7, max=7, weight=1)
205        self.output_lines = []
206        self.output = FormattedTextControl(
207            get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1))
208        )
209        self.output_max_lines = 20
210        self.scan_results_text = FormattedTextControl()
211        self.local_services_text = FormattedTextControl()
212        self.remote_services_text = FormattedTextControl()
213        self.device_text = FormattedTextControl()
214        self.log_text = FormattedTextControl(
215            get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
216        )
217        self.local_values_text = FormattedTextControl()
218        self.remote_values_text = FormattedTextControl()
219        self.log_height = Dimension(min=7, weight=4)
220        self.log_max_lines = 100
221        self.log_lines = []
222
223        container = HSplit(
224            [
225                ConditionalContainer(
226                    Frame(Window(self.scan_results_text), title='Scan Results'),
227                    filter=Condition(lambda: self.top_tab == 'scan'),
228                ),
229                ConditionalContainer(
230                    Frame(Window(self.local_services_text), title='Local Services'),
231                    filter=Condition(lambda: self.top_tab == 'local-services'),
232                ),
233                ConditionalContainer(
234                    Frame(Window(self.local_values_text), title='Local Values'),
235                    filter=Condition(lambda: self.top_tab == 'local-values'),
236                ),
237                ConditionalContainer(
238                    Frame(Window(self.remote_services_text), title='Remote Services'),
239                    filter=Condition(lambda: self.top_tab == 'remote-services'),
240                ),
241                ConditionalContainer(
242                    Frame(Window(self.remote_values_text), title='Remote Values'),
243                    filter=Condition(lambda: self.top_tab == 'remote-values'),
244                ),
245                ConditionalContainer(
246                    Frame(Window(self.log_text, height=self.log_height), title='Log'),
247                    filter=Condition(lambda: self.top_tab == 'log'),
248                ),
249                ConditionalContainer(
250                    Frame(Window(self.device_text), title='Device'),
251                    filter=Condition(lambda: self.top_tab == 'device'),
252                ),
253                Frame(Window(self.output, height=self.output_height)),
254                FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
255                self.input_field,
256            ]
257        )
258
259        container = FloatContainer(
260            container,
261            floats=[
262                Float(
263                    xcursor=True,
264                    ycursor=True,
265                    content=CompletionsMenu(max_height=16, scroll_offset=1),
266                ),
267            ],
268        )
269
270        layout = Layout(container, focused_element=self.input_field)
271
272        key_bindings = KeyBindings()
273
274        @key_bindings.add("c-c")
275        @key_bindings.add("c-q")
276        def _(event):
277            event.app.exit()
278
279        # pylint: disable=invalid-name
280        self.ui = Application(
281            layout=layout, style=style, key_bindings=key_bindings, full_screen=True
282        )
283
284    async def run_async(self, device_config, transport):
285        rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
286
287        async with await open_transport_or_link(transport) as (hci_source, hci_sink):
288            if device_config:
289                self.device = Device.from_config_file_with_hci(
290                    device_config, hci_source, hci_sink
291                )
292            else:
293                random_address = Address.generate_static_address()
294                self.append_to_log(f"Setting random address: {random_address}")
295                self.device = Device.with_hci(
296                    'Bumble', random_address, hci_source, hci_sink
297                )
298            self.device.listener = DeviceListener(self)
299            await self.device.power_on()
300            self.show_device(self.device)
301            self.show_local_services(self.device.gatt_server.attributes)
302
303            # Run the UI
304            await self.ui.run_async()
305
306        rssi_monitoring_task.cancel()
307
308    def add_known_address(self, address):
309        self.known_addresses.add(address)
310
311    def accept_input(self, _):
312        if len(self.input_field.text) == 0:
313            return
314        self.append_to_output([('', '* '), ('ansicyan', self.input_field.text)], False)
315        self.ui.create_background_task(self.command(self.input_field.text))
316
317    def get_status_bar_text(self):
318        scanning = "ON" if self.device and self.device.is_scanning else "OFF"
319
320        connection_state = 'NONE'
321        encryption_state = ''
322        att_mtu = ''
323        rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
324
325        if self.device:
326            if self.device.is_le_connecting:
327                connection_state = 'CONNECTING'
328            elif self.connected_peer:
329                connection = self.connected_peer.connection
330                connection_parameters = (
331                    f'{connection.parameters.connection_interval}/'
332                    f'{connection.parameters.peripheral_latency}/'
333                    f'{connection.parameters.supervision_timeout}'
334                )
335                if connection.transport == BT_LE_TRANSPORT:
336                    phy_state = (
337                        f' RX={le_phy_name(connection.phy.rx_phy)}/'
338                        f'TX={le_phy_name(connection.phy.tx_phy)}'
339                    )
340                else:
341                    phy_state = ''
342                connection_state = (
343                    f'{connection.peer_address} '
344                    f'{connection_parameters} '
345                    f'{connection.data_length}'
346                    f'{phy_state}'
347                )
348                encryption_state = (
349                    'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED'
350                )
351                att_mtu = f'ATT_MTU: {connection.att_mtu}'
352
353        return [
354            ('ansigreen', f' SCAN: {scanning} '),
355            ('', '  '),
356            ('ansiblue', f' CONNECTION: {connection_state} '),
357            ('', '  '),
358            ('ansimagenta', f' {encryption_state} '),
359            ('', '  '),
360            ('ansicyan', f' {att_mtu} '),
361            ('', '  '),
362            ('ansiyellow', f' {rssi} '),
363        ]
364
365    def show_error(self, title, details=None):
366        appended = [('class:error', title)]
367        if details:
368            appended.append(('', f' {details}'))
369        self.append_to_output(appended)
370
371    def show_scan_results(self, scan_results):
372        max_lines = 40  # TEMP
373        lines = []
374        keys = list(scan_results.keys())[:max_lines]
375        for key in keys:
376            lines.append(scan_results[key].to_display_string())
377        self.scan_results_text.text = ANSI('\n'.join(lines))
378        self.ui.invalidate()
379
380    def show_remote_services(self, services):
381        lines = []
382        del self.known_remote_attributes[:]
383        for service in services:
384            lines.append(("ansicyan", f"{service}\n"))
385
386            for characteristic in service.characteristics:
387                lines.append(('ansimagenta', f'  {characteristic} + \n'))
388                self.known_remote_attributes.append(
389                    f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
390                )
391                self.known_remote_attributes.append(
392                    f'*.{characteristic.uuid.to_hex_str()}'
393                )
394                self.known_remote_attributes.append(f'#{characteristic.handle:X}')
395                for descriptor in characteristic.descriptors:
396                    lines.append(("ansigreen", f"    {descriptor}\n"))
397
398        self.remote_services_text.text = lines
399        self.ui.invalidate()
400
401    def show_local_services(self, attributes):
402        lines = []
403        del self.known_local_attributes[:]
404        for attribute in attributes:
405            if isinstance(attribute, Service):
406                # Save the most recent service for use later
407                service = attribute
408                lines.append(("ansicyan", f"{attribute}\n"))
409            elif isinstance(attribute, Characteristic):
410                # CharacteristicDeclaration includes all info from Characteristic
411                # no need to print it twice
412                continue
413            elif isinstance(attribute, CharacteristicDeclaration):
414                # Save the most recent characteristic declaration for use later
415                characteristic_declaration = attribute
416                self.known_local_attributes.append(
417                    f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}'
418                )
419                self.known_local_attributes.append(
420                    f'#{attribute.characteristic.handle:X}'
421                )
422                lines.append(("ansimagenta", f"  {attribute}\n"))
423            elif isinstance(attribute, Descriptor):
424                self.known_local_attributes.append(
425                    f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}'
426                )
427                self.known_local_attributes.append(f'#{attribute.handle:X}')
428                lines.append(("ansigreen", f"    {attribute}\n"))
429            else:
430                lines.append(("ansiyellow", f"{attribute}\n"))
431
432        self.local_services_text.text = lines
433        self.ui.invalidate()
434
435    def show_device(self, device):
436        lines = []
437
438        lines.append(('ansicyan', 'Bumble Version:       '))
439        lines.append(('', f'{__version__}\n'))
440        lines.append(('ansicyan', 'Name:                 '))
441        lines.append(('', f'{device.name}\n'))
442        lines.append(('ansicyan', 'Public Address:       '))
443        lines.append(('', f'{device.public_address}\n'))
444        lines.append(('ansicyan', 'Random Address:       '))
445        lines.append(('', f'{device.random_address}\n'))
446        lines.append(('ansicyan', 'LE Enabled:           '))
447        lines.append(('', f'{device.le_enabled}\n'))
448        lines.append(('ansicyan', 'Classic Enabled:      '))
449        lines.append(('', f'{device.classic_enabled}\n'))
450        lines.append(('ansicyan', 'Classic SC Enabled:   '))
451        lines.append(('', f'{device.classic_sc_enabled}\n'))
452        lines.append(('ansicyan', 'Classic SSP Enabled:  '))
453        lines.append(('', f'{device.classic_ssp_enabled}\n'))
454        lines.append(('ansicyan', 'Classic Class:        '))
455        lines.append(('', f'{device.class_of_device}\n'))
456        lines.append(('ansicyan', 'Discoverable:         '))
457        lines.append(('', f'{device.discoverable}\n'))
458        lines.append(('ansicyan', 'Connectable:          '))
459        lines.append(('', f'{device.connectable}\n'))
460        lines.append(('ansicyan', 'Advertising Data:     '))
461        lines.append(('', f'{device.advertising_data}\n'))
462        lines.append(('ansicyan', 'Scan Response Data:   '))
463        lines.append(('', f'{device.scan_response_data}\n'))
464        advertising_interval = (
465            device.advertising_interval_min
466            if device.advertising_interval_min == device.advertising_interval_max
467            else (
468                f'{device.advertising_interval_min} to '
469                f'{device.advertising_interval_max}'
470            )
471        )
472        lines.append(('ansicyan', 'Advertising Interval: '))
473        lines.append(('', f'{advertising_interval}\n'))
474
475        self.device_text.text = lines
476        self.ui.invalidate()
477
478    def append_to_output(self, line, invalidate=True):
479        if isinstance(line, str):
480            line = [('', line)]
481        self.output_lines = self.output_lines[-self.output_max_lines :]
482        self.output_lines.append(line)
483        formatted_text = []
484        for line in self.output_lines:
485            formatted_text += line
486            formatted_text.append(('', '\n'))
487        self.output.text = formatted_text
488        if invalidate:
489            self.ui.invalidate()
490
491    def append_to_log(self, lines, invalidate=True):
492        self.log_lines.extend(lines.split('\n'))
493        self.log_lines = self.log_lines[-self.log_max_lines :]
494        self.log_text.text = ANSI('\n'.join(self.log_lines))
495        if invalidate:
496            self.ui.invalidate()
497
498    async def discover_services(self):
499        if not self.connected_peer:
500            self.show_error('not connected')
501            return
502
503        self.append_to_output('Service Discovery starting...')
504        await self.connected_peer.discover_all()
505        self.append_to_output('Service Discovery done!')
506
507    async def discover_attributes(self):
508        if not self.connected_peer:
509            self.show_error('not connected')
510            return
511
512        # Discover all attributes
513        self.append_to_output('discovering attributes...')
514        attributes = await self.connected_peer.discover_attributes()
515        self.append_to_output(f'discovered {len(attributes)} attributes...')
516
517        self.show_attributes(attributes)
518
519    def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]:
520        if not self.connected_peer:
521            return None
522        parts = param.split('.')
523        if len(parts) == 2:
524            service_uuid = UUID(parts[0]) if parts[0] != '*' else None
525            characteristic_uuid = UUID(parts[1])
526            for service in self.connected_peer.services:
527                if service_uuid is None or service.uuid == service_uuid:
528                    for characteristic in service.characteristics:
529                        if characteristic.uuid == characteristic_uuid:
530                            return characteristic
531        elif len(parts) == 1:
532            if parts[0].startswith('#'):
533                attribute_handle = int(f'{parts[0][1:]}', 16)
534                for service in self.connected_peer.services:
535                    for characteristic in service.characteristics:
536                        if characteristic.handle == attribute_handle:
537                            return characteristic
538
539        return None
540
541    def find_local_attribute(
542        self, param
543    ) -> Optional[Union[Characteristic, Descriptor]]:
544        parts = param.split('.')
545        if len(parts) == 3:
546            service_uuid = UUID(parts[0])
547            characteristic_uuid = UUID(parts[1])
548            descriptor_uuid = UUID(parts[2])
549            return self.device.gatt_server.get_descriptor_attribute(
550                service_uuid, characteristic_uuid, descriptor_uuid
551            )
552        if len(parts) == 2:
553            service_uuid = UUID(parts[0])
554            characteristic_uuid = UUID(parts[1])
555            characteristic_attributes = (
556                self.device.gatt_server.get_characteristic_attributes(
557                    service_uuid, characteristic_uuid
558                )
559            )
560            if characteristic_attributes:
561                return characteristic_attributes[1]
562            return None
563        elif len(parts) == 1:
564            if parts[0].startswith('#'):
565                attribute_handle = int(f'{parts[0][1:]}', 16)
566                attribute = self.device.gatt_server.get_attribute(attribute_handle)
567                if isinstance(attribute, (Characteristic, Descriptor)):
568                    return attribute
569                return None
570
571        return None
572
573    async def rssi_monitor_loop(self):
574        while True:
575            if self.monitor_rssi and self.connected_peer:
576                self.connection_rssi = await self.connected_peer.connection.get_rssi()
577            await asyncio.sleep(RSSI_MONITOR_INTERVAL)
578
579    async def command(self, command):
580        try:
581            (keyword, *params) = command.strip().split(' ')
582            keyword = keyword.replace('-', '_').lower()
583            handler = getattr(self, f'do_{keyword}', None)
584            if handler:
585                await handler(params)
586                self.ui.invalidate()
587            else:
588                self.show_error('unknown command', keyword)
589        except Exception as error:
590            self.show_error(str(error))
591
592    async def do_scan(self, params):
593        if len(params) == 0:
594            # Toggle scanning
595            if self.device.is_scanning:
596                await self.device.stop_scanning()
597            else:
598                await self.device.start_scanning()
599        elif params[0] == 'on':
600            if len(params) == 2:
601                if not params[1].startswith("filter="):
602                    self.show_error(
603                        'invalid syntax',
604                        'expected address filter=key1:value1,key2:value,... '
605                        'available filters: address',
606                    )
607                # regex: (word):(any char except ,)
608                matches = re.findall(r"(\w+):([^,]+)", params[1])
609                for match in matches:
610                    if match[0] == "address":
611                        self.device.listener.address_filter = match[1]
612
613            await self.device.start_scanning()
614            self.top_tab = 'scan'
615        elif params[0] == 'off':
616            await self.device.stop_scanning()
617        elif params[0] == 'clear':
618            self.device.listener.scan_results.clear()
619            self.known_addresses.clear()
620            self.show_scan_results(self.device.listener.scan_results)
621        else:
622            self.show_error('unsupported arguments for scan command')
623
624    async def do_rssi(self, params):
625        if len(params) == 0:
626            # Toggle monitoring
627            self.monitor_rssi = not self.monitor_rssi
628        elif params[0] == 'on':
629            self.monitor_rssi = True
630        elif params[0] == 'off':
631            self.monitor_rssi = False
632        else:
633            self.show_error('unsupported arguments for rssi command')
634
635    async def do_connect(self, params):
636        if len(params) != 1 and len(params) != 2:
637            self.show_error('invalid syntax', 'expected connect <address> [phys]')
638            return
639
640        if len(params) == 1:
641            phys = None
642        else:
643            phys = parse_phys(params[1])
644        if phys is None:
645            connection_parameters_preferences = None
646        else:
647            connection_parameters_preferences = {
648                phy: ConnectionParametersPreferences() for phy in phys
649            }
650
651        if self.device.is_scanning:
652            await self.device.stop_scanning()
653
654        self.append_to_output('connecting...')
655
656        try:
657            await self.device.connect(
658                params[0],
659                connection_parameters_preferences=connection_parameters_preferences,
660                timeout=DEFAULT_CONNECTION_TIMEOUT,
661            )
662            self.top_tab = 'services'
663        except bumble.core.TimeoutError:
664            self.show_error('connection timed out')
665
666    async def do_disconnect(self, _):
667        if self.device.is_le_connecting:
668            await self.device.cancel_connection()
669        else:
670            if not self.connected_peer:
671                self.show_error('not connected')
672                return
673
674            await self.connected_peer.connection.disconnect()
675
676    async def do_update_parameters(self, params):
677        if len(params) != 1 or len(params[0].split('/')) != 3:
678            self.show_error(
679                'invalid syntax',
680                'expected update-parameters <interval-min>-<interval-max>'
681                '/<max-latency>/<supervision>',
682            )
683            return
684
685        if not self.connected_peer:
686            self.show_error('not connected')
687            return
688
689        connection_intervals, max_latency, supervision_timeout = params[0].split('/')
690        connection_interval_min, connection_interval_max = [
691            int(x) for x in connection_intervals.split('-')
692        ]
693        max_latency = int(max_latency)
694        supervision_timeout = int(supervision_timeout)
695        await self.connected_peer.connection.update_parameters(
696            connection_interval_min,
697            connection_interval_max,
698            max_latency,
699            supervision_timeout,
700        )
701
702    async def do_encrypt(self, _):
703        if not self.connected_peer:
704            self.show_error('not connected')
705            return
706
707        await self.connected_peer.connection.encrypt()
708
709    async def do_advertise(self, params):
710        if len(params) == 0:
711            # Toggle advertising
712            if self.device.is_advertising:
713                await self.device.stop_advertising()
714            else:
715                await self.device.start_advertising()
716        elif params[0] == 'on':
717            await self.device.start_advertising()
718        elif params[0] == 'off':
719            await self.device.stop_advertising()
720        else:
721            self.show_error('unsupported arguments for advertise command')
722
723    async def do_show(self, params):
724        if params:
725            if params[0] in {
726                'scan',
727                'log',
728                'device',
729                'local-services',
730                'remote-services',
731                'local-values',
732                'remote-values',
733            }:
734                self.top_tab = params[0]
735                self.ui.invalidate()
736
737        while self.top_tab == 'local-values':
738            await self.do_show_local_values()
739            await asyncio.sleep(1)
740
741        while self.top_tab == 'remote-values':
742            await self.do_show_remote_values()
743            await asyncio.sleep(1)
744
745    async def do_show_local_values(self):
746        prettytable = PrettyTable()
747        field_names = ["Service", "Characteristic", "Descriptor"]
748
749        # if there's no connections, add a column just for value
750        if not self.device.connections:
751            field_names.append("Value")
752
753        # if there are connections, add a column for each connection's value
754        for connection in self.device.connections.values():
755            field_names.append(f"Connection {connection.handle}")
756
757        for attribute in self.device.gatt_server.attributes:
758            if isinstance(attribute, Characteristic):
759                service = self.device.gatt_server.get_attribute_group(
760                    attribute.handle, Service
761                )
762                if not service:
763                    continue
764                values = [
765                    await attribute.read_value(connection)
766                    for connection in self.device.connections.values()
767                ]
768                if not values:
769                    values = [attribute.read_value(None)]
770                prettytable.add_row([f"{service.uuid}", attribute.uuid, ""] + values)
771
772            elif isinstance(attribute, Descriptor):
773                service = self.device.gatt_server.get_attribute_group(
774                    attribute.handle, Service
775                )
776                if not service:
777                    continue
778                characteristic = self.device.gatt_server.get_attribute_group(
779                    attribute.handle, Characteristic
780                )
781                if not characteristic:
782                    continue
783                values = [
784                    await attribute.read_value(connection)
785                    for connection in self.device.connections.values()
786                ]
787                if not values:
788                    values = [await attribute.read_value(None)]
789
790                # TODO: future optimization: convert CCCD value to human readable string
791
792                prettytable.add_row(
793                    [service.uuid, characteristic.uuid, attribute.type] + values
794                )
795
796        prettytable.field_names = field_names
797        self.local_values_text.text = prettytable.get_string()
798        self.ui.invalidate()
799
800    async def do_show_remote_values(self):
801        prettytable = PrettyTable(
802            field_names=[
803                "Connection",
804                "Service",
805                "Characteristic",
806                "Descriptor",
807                "Time",
808                "Value",
809            ]
810        )
811        for connection in self.device.connections.values():
812            for handle, (time, value) in connection.gatt_client.cached_values.items():
813                row = [connection.handle]
814                attribute = connection.gatt_client.get_attributes(handle)
815                if not attribute:
816                    continue
817                if len(attribute) == 3:
818                    row.extend(
819                        [attribute[0].uuid, attribute[1].uuid, attribute[2].type]
820                    )
821                elif len(attribute) == 2:
822                    row.extend([attribute[0].uuid, attribute[1].uuid, ""])
823                elif len(attribute) == 1:
824                    row.extend([attribute[0].uuid, "", ""])
825                else:
826                    continue
827
828                row.extend([humanize.naturaltime(time), value])
829                prettytable.add_row(row)
830
831        self.remote_values_text.text = prettytable.get_string()
832        self.ui.invalidate()
833
834    async def do_get_phy(self, _):
835        if not self.connected_peer:
836            self.show_error('not connected')
837            return
838
839        phy = await self.connected_peer.connection.get_phy()
840        self.append_to_output(
841            f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, '
842            f'TX={HCI_Constant.le_phy_name(phy[1])}'
843        )
844
845    async def do_request_mtu(self, params):
846        if len(params) != 1:
847            self.show_error('invalid syntax', 'expected request-mtu <mtu>')
848            return
849
850        if not self.connected_peer:
851            self.show_error('not connected')
852            return
853
854        await self.connected_peer.request_mtu(int(params[0]))
855
856    async def do_discover(self, params):
857        if not params:
858            self.show_error('invalid syntax', 'expected discover services|attributes')
859            return
860
861        discovery_type = params[0]
862        if discovery_type == 'services':
863            await self.discover_services()
864        elif discovery_type == 'attributes':
865            await self.discover_attributes()
866
867    async def do_read(self, params):
868        if len(params) != 1:
869            self.show_error('invalid syntax', 'expected read <attribute>')
870            return
871
872        if not self.connected_peer:
873            self.show_error('not connected')
874            return
875
876        characteristic = self.find_remote_characteristic(params[0])
877        if characteristic is None:
878            self.show_error('no such characteristic')
879            return
880
881        value = await characteristic.read_value()
882        self.append_to_output(f'VALUE: 0x{value.hex()}')
883
884    async def do_write(self, params):
885        if not self.connected_peer:
886            self.show_error('not connected')
887            return
888
889        if len(params) != 2:
890            self.show_error('invalid syntax', 'expected write <attribute> <value>')
891            return
892
893        if params[1].upper().startswith("0X"):
894            value = bytes.fromhex(params[1][2:])  # parse as hex string
895        else:
896            try:
897                value = int(params[1])  # try as integer
898            except ValueError:
899                value = str.encode(params[1])  # must be a string
900
901        characteristic = self.find_remote_characteristic(params[0])
902        if characteristic is None:
903            self.show_error('no such characteristic')
904            return
905
906        # use write with response if supported
907        with_response = characteristic.properties & Characteristic.Properties.WRITE
908        await characteristic.write_value(value, with_response=with_response)
909
910    async def do_local_write(self, params):
911        if len(params) != 2:
912            self.show_error(
913                'invalid syntax', 'expected local-write <attribute> <value>'
914            )
915            return
916
917        if params[1].upper().startswith("0X"):
918            value = bytes.fromhex(params[1][2:])  # parse as hex string
919        else:
920            try:
921                value = int(params[1]).to_bytes(2, "little")  # try as 2 byte integer
922            except ValueError:
923                value = str.encode(params[1])  # must be a string
924
925        attribute = self.find_local_attribute(params[0])
926        if not attribute:
927            self.show_error('invalid syntax', 'unable to find attribute')
928            return
929
930        # send data to any subscribers
931        if isinstance(attribute, Characteristic):
932            await attribute.write_value(None, value)
933            if attribute.has_properties(Characteristic.NOTIFY):
934                await self.device.gatt_server.notify_subscribers(attribute)
935            if attribute.has_properties(Characteristic.INDICATE):
936                await self.device.gatt_server.indicate_subscribers(attribute)
937
938    async def do_subscribe(self, params):
939        if not self.connected_peer:
940            self.show_error('not connected')
941            return
942
943        if len(params) != 1:
944            self.show_error('invalid syntax', 'expected subscribe <attribute>')
945            return
946
947        characteristic = self.find_remote_characteristic(params[0])
948        if characteristic is None:
949            self.show_error('no such characteristic')
950            return
951
952        await characteristic.subscribe(
953            lambda value: self.append_to_output(
954                f"{characteristic} VALUE: 0x{value.hex()}"
955            ),
956        )
957
958    async def do_unsubscribe(self, params):
959        if not self.connected_peer:
960            self.show_error('not connected')
961            return
962
963        if len(params) != 1:
964            self.show_error('invalid syntax', 'expected subscribe <attribute>')
965            return
966
967        characteristic = self.find_remote_characteristic(params[0])
968        if characteristic is None:
969            self.show_error('no such characteristic')
970            return
971
972        await characteristic.unsubscribe()
973
974    async def do_set_phy(self, params):
975        if len(params) != 1:
976            self.show_error(
977                'invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>'
978            )
979            return
980
981        if not self.connected_peer:
982            self.show_error('not connected')
983            return
984
985        if '/' in params[0]:
986            tx_phys, rx_phys = params[0].split('/')
987        else:
988            tx_phys = params[0]
989            rx_phys = tx_phys
990
991        await self.connected_peer.connection.set_phy(
992            tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys)
993        )
994
995    async def do_set_default_phy(self, params):
996        if len(params) != 1:
997            self.show_error(
998                'invalid syntax',
999                'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>',
1000            )
1001            return
1002
1003        if '/' in params[0]:
1004            tx_phys, rx_phys = params[0].split('/')
1005        else:
1006            tx_phys = params[0]
1007            rx_phys = tx_phys
1008
1009        await self.device.set_default_phy(
1010            tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys)
1011        )
1012
1013    async def do_exit(self, _):
1014        self.ui.exit()
1015
1016    async def do_quit(self, _):
1017        self.ui.exit()
1018
1019    async def do_filter(self, params):
1020        if params[0] == "address":
1021            if len(params) != 2:
1022                self.show_error('invalid syntax', 'expected filter address <pattern>')
1023                return
1024            self.device.listener.address_filter = params[1]
1025
1026
1027# -----------------------------------------------------------------------------
1028# Device and Connection Listener
1029# -----------------------------------------------------------------------------
1030class DeviceListener(Device.Listener, Connection.Listener):
1031    def __init__(self, app):
1032        self.app = app
1033        self.scan_results = OrderedDict()
1034        self.address_filter = None
1035
1036    @property
1037    def address_filter(self):
1038        return self._address_filter
1039
1040    @address_filter.setter
1041    def address_filter(self, filter_addr):
1042        if filter_addr is None:
1043            self._address_filter = re.compile(r".*")
1044        else:
1045            self._address_filter = re.compile(filter_addr)
1046        self.scan_results = OrderedDict(
1047            filter(self.filter_address_match, self.scan_results)
1048        )
1049        self.app.show_scan_results(self.scan_results)
1050
1051    def filter_address_match(self, address):
1052        """
1053        Returns true if an address matches the filter
1054        """
1055        return bool(self.address_filter.match(address))
1056
1057    @AsyncRunner.run_in_task()
1058    # pylint: disable=invalid-overridden-method
1059    async def on_connection(self, connection):
1060        self.app.connected_peer = Peer(connection)
1061        self.app.connection_rssi = None
1062        self.app.append_to_output(f'connected to {self.app.connected_peer}')
1063        connection.listener = self
1064
1065    def on_disconnection(self, reason):
1066        self.app.append_to_output(
1067            f'disconnected from {self.app.connected_peer}, '
1068            f'reason: {HCI_Constant.error_name(reason)}'
1069        )
1070        self.app.connected_peer = None
1071        self.app.connection_rssi = None
1072
1073    def on_connection_parameters_update(self):
1074        self.app.append_to_output(
1075            f'connection parameters update: '
1076            f'{self.app.connected_peer.connection.parameters}'
1077        )
1078
1079    def on_connection_phy_update(self):
1080        self.app.append_to_output(
1081            f'connection phy update: {self.app.connected_peer.connection.phy}'
1082        )
1083
1084    def on_connection_att_mtu_update(self):
1085        self.app.append_to_output(
1086            f'connection att mtu update: {self.app.connected_peer.connection.att_mtu}'
1087        )
1088
1089    def on_connection_encryption_change(self):
1090        encryption_state = (
1091            'encrypted'
1092            if self.app.connected_peer.connection.is_encrypted
1093            else 'not encrypted'
1094        )
1095        self.app.append_to_output(
1096            'connection encryption change: ' f'{encryption_state}'
1097        )
1098
1099    def on_connection_data_length_change(self):
1100        self.app.append_to_output(
1101            'connection data length change: '
1102            f'{self.app.connected_peer.connection.data_length}'
1103        )
1104
1105    def on_advertisement(self, advertisement):
1106        if not self.filter_address_match(str(advertisement.address)):
1107            return
1108
1109        entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
1110        entry = self.scan_results.get(entry_key)
1111        if entry:
1112            entry.ad_data = advertisement.data
1113            entry.rssi = advertisement.rssi
1114            entry.connectable = advertisement.is_connectable
1115        else:
1116            self.app.add_known_address(str(advertisement.address))
1117            self.scan_results[entry_key] = ScanResult(
1118                advertisement.address,
1119                advertisement.address.address_type,
1120                advertisement.data,
1121                advertisement.rssi,
1122                advertisement.is_connectable,
1123            )
1124
1125        self.app.show_scan_results(self.scan_results)
1126
1127
1128# -----------------------------------------------------------------------------
1129# Scanning
1130# -----------------------------------------------------------------------------
1131class ScanResult:
1132    def __init__(self, address, address_type, ad_data, rssi, connectable):
1133        self.address = address
1134        self.address_type = address_type
1135        self.ad_data = ad_data
1136        self.rssi = rssi
1137        self.connectable = connectable
1138
1139    def to_display_string(self):
1140        address_type_string = ('P', 'R', 'PI', 'RI')[self.address_type]
1141        address_color = colors.yellow if self.connectable else colors.red
1142        if address_type_string.startswith('P'):
1143            type_color = colors.green
1144        else:
1145            type_color = colors.cyan
1146
1147        name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True)
1148        if name is None:
1149            name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True)
1150        if name:
1151            # Convert to string
1152            try:
1153                name = name.decode()
1154            except UnicodeDecodeError:
1155                name = name.hex()
1156        else:
1157            name = ''
1158
1159        # Remove any '/P' qualifier suffix from the address string
1160        address_str = self.address.to_string(with_type_qualifier=False)
1161
1162        # RSSI bar
1163        bar_string = rssi_bar(self.rssi)
1164        bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string))
1165        return (
1166            f'{address_color(address_str)} [{type_color(address_type_string)}] '
1167            f'{bar_string} {bar_padding} {name}'
1168        )
1169
1170
1171# -----------------------------------------------------------------------------
1172# Logging
1173# -----------------------------------------------------------------------------
1174class LogHandler(logging.Handler):
1175    def __init__(self, app):
1176        super().__init__()
1177        self.app = app
1178        self.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s'))
1179
1180    def emit(self, record):
1181        message = self.format(record)
1182        self.app.append_to_log(message)
1183
1184
1185# -----------------------------------------------------------------------------
1186# Main
1187# -----------------------------------------------------------------------------
1188@click.command()
1189@click.option('--device-config', help='Device configuration file')
1190@click.argument('transport')
1191def main(device_config, transport):
1192    # Ensure that the BUMBLE_USER_DIR directory exists
1193    if not os.path.isdir(BUMBLE_USER_DIR):
1194        os.mkdir(BUMBLE_USER_DIR)
1195
1196    # Create an instance of the app
1197    app = ConsoleApp()
1198
1199    # Setup logging
1200    # logging.basicConfig(level = 'FATAL')
1201    # logging.basicConfig(level = 'DEBUG')
1202    root_logger = logging.getLogger()
1203
1204    root_logger.addHandler(LogHandler(app))
1205    root_logger.setLevel(logging.DEBUG)
1206
1207    # Run until the user exits
1208    asyncio.run(app.run_async(device_config, transport))
1209
1210
1211# -----------------------------------------------------------------------------
1212if __name__ == "__main__":
1213    main()  # pylint: disable=no-value-for-parameter
1214