1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import logging
19import asyncio
20import sys
21import os
22from bumble.colors import color
23from bumble.device import Device
24from bumble.controller import Controller
25from bumble.hci import Address
26from bumble.link import LocalLink
27from bumble.transport import open_transport_or_link
28
29
30# -----------------------------------------------------------------------------
31class ScannerListener(Device.Listener):
32    def on_advertisement(self, advertisement):
33        address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
34        address_color = 'yellow' if advertisement.is_connectable else 'red'
35        if address_type_string.startswith('P'):
36            type_color = 'green'
37        else:
38            type_color = 'cyan'
39
40        print(
41            f'>>> {color(advertisement.address, address_color)} '
42            f'[{color(address_type_string, type_color)}]: '
43            f'RSSI={advertisement.rssi}, {advertisement.data}'
44        )
45
46
47# -----------------------------------------------------------------------------
48async def main() -> None:
49    if len(sys.argv) != 2:
50        print('Usage: run_controller.py <transport-spec>')
51        print('example: run_controller_with_scanner.py serial:/dev/pts/14,1000000')
52        return
53
54    print('>>> connecting to HCI...')
55    async with await open_transport_or_link(sys.argv[1]) as hci_transport:
56        print('>>> connected')
57
58        # Create a local link
59        link = LocalLink()
60
61        # Create a first controller using the packet source/sink as its host interface
62        controller1 = Controller(
63            'C1',
64            host_source=hci_transport.source,
65            host_sink=hci_transport.sink,
66            link=link,
67            public_address='E0:E1:E2:E3:E4:E5',
68        )
69
70        # Create a second controller using the same link
71        controller2 = Controller('C2', link=link)
72
73        # Create a device with a scanner listener
74        device = Device.with_hci(
75            'Bumble', Address('F0:F1:F2:F3:F4:F5'), controller2, controller2
76        )
77        device.listener = ScannerListener()
78        await device.power_on()
79        await device.start_scanning()
80
81        await hci_transport.source.wait_for_termination()
82
83
84# -----------------------------------------------------------------------------
85logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
86asyncio.run(main())
87