1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import logging
19import asyncio
20import os
21
22import click
23
24from bumble import transport
25from bumble.host import Host
26from bumble.drivers import rtk
27
28# -----------------------------------------------------------------------------
29# Logging
30# -----------------------------------------------------------------------------
31logger = logging.getLogger(__name__)
32
33
34# -----------------------------------------------------------------------------
35def do_parse(firmware_path):
36    with open(firmware_path, 'rb') as firmware_file:
37        firmware_data = firmware_file.read()
38        firmware = rtk.Firmware(firmware_data)
39        print(
40            f"Firmware: version=0x{firmware.version:08X} "
41            f"project_id=0x{firmware.project_id:04X}"
42        )
43        for patch in firmware.patches:
44            print(
45                f"  Patch: chip_id=0x{patch[0]:04X}, "
46                f"{len(patch[1])} bytes, "
47                f"SVN Version={patch[2]:08X}"
48            )
49
50
51# -----------------------------------------------------------------------------
52async def do_load(usb_transport, force):
53    async with await transport.open_transport_or_link(usb_transport) as (
54        hci_source,
55        hci_sink,
56    ):
57        # Create a host to communicate with the device
58        host = Host(hci_source, hci_sink)
59        await host.reset(driver_factory=None)
60
61        # Get the driver.
62        driver = await rtk.Driver.for_host(host, force)
63        if driver is None:
64            print("Firmware already loaded or no supported driver for this device.")
65            return
66
67        await driver.download_firmware()
68
69
70# -----------------------------------------------------------------------------
71async def do_drop(usb_transport):
72    async with await transport.open_transport_or_link(usb_transport) as (
73        hci_source,
74        hci_sink,
75    ):
76        # Create a host to communicate with the device
77        host = Host(hci_source, hci_sink)
78        await host.reset(driver_factory=None)
79
80        # Tell the device to reset/drop any loaded patch
81        await rtk.Driver.drop_firmware(host)
82
83
84# -----------------------------------------------------------------------------
85async def do_info(usb_transport, force):
86    async with await transport.open_transport(usb_transport) as (
87        hci_source,
88        hci_sink,
89    ):
90        # Create a host to communicate with the device
91        host = Host(hci_source, hci_sink)
92        await host.reset(driver_factory=None)
93
94        # Check if this is a supported device.
95        if not force and not rtk.Driver.check(host):
96            print("USB device not supported by this RTK driver")
97            return
98
99        # Get the driver info.
100        driver_info = await rtk.Driver.driver_info_for_host(host)
101        if driver_info:
102            print(
103                "Driver:\n"
104                f"  ROM:      {driver_info.rom:04X}\n"
105                f"  Firmware: {driver_info.fw_name}\n"
106                f"  Config:   {driver_info.config_name}\n"
107            )
108        else:
109            print("Firmware already loaded or no supported driver for this device.")
110
111
112# -----------------------------------------------------------------------------
113@click.group()
114def main():
115    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
116
117
118@main.command
119@click.argument("firmware_path")
120def parse(firmware_path):
121    """Parse a firmware image."""
122    do_parse(firmware_path)
123
124
125@main.command
126@click.argument("usb_transport")
127@click.option(
128    "--force",
129    is_flag=True,
130    default=False,
131    help="Load even if the USB info doesn't match",
132)
133def load(usb_transport, force):
134    """Load a firmware image into the USB dongle."""
135    asyncio.run(do_load(usb_transport, force))
136
137
138@main.command
139@click.argument("usb_transport")
140def drop(usb_transport):
141    """Drop a firmware image from the USB dongle."""
142    asyncio.run(do_drop(usb_transport))
143
144
145@main.command
146@click.argument("usb_transport")
147@click.option(
148    "--force",
149    is_flag=True,
150    default=False,
151    help="Try to get the device info even if the USB info doesn't match",
152)
153def info(usb_transport, force):
154    """Get the firmware info from a USB dongle."""
155    asyncio.run(do_info(usb_transport, force))
156
157
158# -----------------------------------------------------------------------------
159if __name__ == '__main__':
160    main()
161