xref: /aosp_15_r20/external/pigweed/targets/rp2040/py/rp2040_utils/flasher.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2024 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Flashes binaries to attached Raspberry Pi Pico boards."""
16
17import argparse
18import logging
19import os
20from pathlib import Path
21import subprocess
22import sys
23import time
24
25import pw_cli.log
26from pw_cli.interactive_prompts import interactive_index_select
27
28from rp2040_utils import device_detector
29from rp2040_utils.device_detector import PicoBoardInfo, PicoDebugProbeBoardInfo
30
31_LOG = logging.getLogger()
32
33# If the script is being run through Bazel, our support binaries are provided
34# at well known locations in its runfiles.
35try:
36    from python.runfiles import runfiles  # type: ignore
37
38    r = runfiles.Create()
39    _PROBE_RS_COMMAND = r.Rlocation('probe_rs/probe-rs')
40    _PICOTOOL_COMMAND = r.Rlocation('picotool/picotool')
41except ImportError:
42    _PROBE_RS_COMMAND = 'probe-rs'
43    _PICOTOOL_COMMAND = 'picotool'
44
45
46def flash(board_info: PicoBoardInfo, chip: str, binary: Path) -> bool:
47    """Load `binary` onto `board_info` and wait for the device to become
48    available.
49
50    Returns whether or not flashing was successful."""
51    if isinstance(board_info, PicoDebugProbeBoardInfo):
52        return _load_debugprobe_binary(board_info, chip, binary)
53    if not _load_picotool_binary(board_info, binary):
54        return False
55    if not _wait_for_serial_port(board_info):
56        _LOG.error(
57            'Binary flashed but unable to connect to the serial port: %s',
58            board_info.serial_port,
59        )
60        return False
61    _LOG.info(
62        'Successfully flashed Pico on bus %s, port %s, serial port %s',
63        board_info.bus,
64        board_info.port,
65        board_info.serial_port,
66    )
67    return True
68
69
70def find_elf(binary: Path) -> Path | None:
71    """Attempt to find and return the path to an ELF file for a binary.
72
73    Args:
74      binary: A relative path to a binary.
75
76    Returns the path to the associated ELF file, or None if none was found.
77    """
78    if binary.suffix == '.elf' or not binary.suffix:
79        return binary
80    choices = (
81        binary.parent / f'{binary.stem}.elf',
82        binary.parent / 'bin' / f'{binary.stem}.elf',
83        binary.parent / 'test' / f'{binary.stem}.elf',
84    )
85    for choice in choices:
86        if choice.is_file():
87            return choice
88
89    _LOG.error(
90        'Cannot find ELF file to use as a token database for binary: %s',
91        binary,
92    )
93    return None
94
95
96def _load_picotool_binary(board_info: PicoBoardInfo, binary: Path) -> bool:
97    """Flash a binary to this device using picotool, returning success or
98    failure."""
99    cmd = [
100        _PICOTOOL_COMMAND,
101        'load',
102        '-x',
103        # We use the absolute path since `cwd` is changed below.
104        str(binary.absolute()),
105    ]
106
107    # If the binary has not file extension, assume that it is ELF and
108    # explicitly tell `picotool` that.
109    if not binary.suffix:
110        cmd += ['-t', 'elf']
111
112    cmd += [
113        '--bus',
114        str(board_info.bus),
115        '--address',
116        str(board_info.address()),
117        '-F',
118    ]
119
120    _LOG.debug('Flashing ==> %s', ' '.join(cmd))
121
122    # If the script is running inside Bazel, `picotool` needs to run from
123    # the project root so that it can find libusb.
124    cwd = None
125    if 'BUILD_WORKING_DIRECTORY' in os.environ:
126        cwd = os.environ['BUILD_WORKING_DIRECTORY']
127
128    process = subprocess.run(
129        cmd,
130        stdout=subprocess.PIPE,
131        stderr=subprocess.STDOUT,
132        cwd=cwd,
133    )
134    if process.returncode:
135        err = (
136            'Flashing command failed: ' + ' '.join(cmd),
137            str(board_info),
138            process.stdout.decode('utf-8', errors='replace'),
139        )
140        _LOG.error('\n\n'.join(err))
141        return False
142    return True
143
144
145def _wait_for_serial_port(board_info: PicoBoardInfo) -> bool:
146    """Waits for a serial port to enumerate."""
147    start_time = time.monotonic()
148    timeout_seconds = 10.0
149    while time.monotonic() - start_time < timeout_seconds:
150        # If the serial port path isn't known, watch for a newly
151        # enumerated path.
152        if board_info.serial_port is None:
153            # Wait a bit before checking for a new port.
154            time.sleep(0.3)
155            # Check for updated serial port path.
156            for device in device_detector.detect_boards():
157                if (
158                    device.bus == board_info.bus
159                    and device.port == board_info.port
160                ):
161                    board_info.serial_port = device.serial_port
162                    # Serial port found, break out of device for loop.
163                    break
164
165        # Serial port known try to connect to it.
166        if board_info.serial_port is not None:
167            # Connect to the serial port.
168            try:
169                with open(board_info.serial_port, 'r+b', buffering=0):
170                    return True
171            except (OSError, IOError):
172                _LOG.debug(
173                    'Unable to connect to %s, retrying', board_info.serial_port
174                )
175                time.sleep(0.1)
176    _LOG.error(
177        'Binary flashed but unable to connect to the serial port: %s',
178        board_info.serial_port,
179    )
180    return False
181
182
183def _load_debugprobe_binary(
184    board_info: PicoDebugProbeBoardInfo, chip: str, binary: Path
185) -> bool:
186    """Flash a binary to this device using a debug probe, returning success
187    or failure."""
188    elf_path = find_elf(binary)
189    if not elf_path:
190        return False
191
192    # `probe-rs` takes a `--probe` argument of the form:
193    #  <vendor_id>:<product_id>:<serial_number>
194    probe = "{:04x}:{:04x}:{}".format(
195        board_info.vendor_id(),
196        board_info.device_id(),
197        board_info.serial_number,
198    )
199
200    download_cmd = (
201        _PROBE_RS_COMMAND,
202        'download',
203        '--probe',
204        probe,
205        '--chip',
206        chip,
207        '--speed',
208        '10000',
209        str(elf_path),
210    )
211    _LOG.debug('Flashing ==> %s', ' '.join(download_cmd))
212    process = subprocess.run(
213        download_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
214    )
215    if process.returncode:
216        err = (
217            'Flashing command failed: ' + ' '.join(download_cmd),
218            str(board_info),
219            process.stdout.decode('utf-8', errors='replace'),
220        )
221        _LOG.error('\n\n'.join(err))
222        return False
223
224    # `probe-rs download` leaves the device halted so it needs to be reset
225    # to run.
226    reset_cmd = (
227        _PROBE_RS_COMMAND,
228        'reset',
229        '--probe',
230        probe,
231        '--chip',
232        chip,
233    )
234    _LOG.debug('Resetting ==> %s', ' '.join(reset_cmd))
235    process = subprocess.run(
236        reset_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
237    )
238    if process.returncode:
239        err = (
240            'Resetting command failed: ' + ' '.join(reset_cmd),
241            str(board_info),
242            process.stdout.decode('utf-8', errors='replace'),
243        )
244        _LOG.error('\n\n'.join(err))
245        return False
246
247    # Give time for the device to reset. Ideally the common unit test
248    # runner would wait for input but this is not the case.
249    time.sleep(0.5)
250
251    return True
252
253
254def create_flash_parser() -> argparse.ArgumentParser:
255    """Returns a parser for flashing command-line arguments."""
256    parser = argparse.ArgumentParser(description=__doc__)
257    parser.add_argument('binary', type=Path, help='The target binary to flash')
258    parser.add_argument(
259        '--usb-bus',
260        type=int,
261        help='The bus this Pi Pico is on',
262    )
263    parser.add_argument(
264        '--usb-port',
265        type=str,
266        help=(
267            'The port chain as a colon-separated list of integers of this Pi '
268            'Pico on the specified USB bus (e.g. 1:4:2:2)'
269        ),
270    )
271    parser.add_argument(
272        '-b',
273        '--baud',
274        type=int,
275        default=115200,
276        help='Baud rate to use for serial communication with target device',
277    )
278    parser.add_argument(
279        '--debug-probe-only',
280        action='store_true',
281        help='Only flash on detected Pi Pico debug probes',
282    )
283    parser.add_argument(
284        '--pico-only',
285        action='store_true',
286        help='Only flash on detected Pi Pico boards',
287    )
288    parser.add_argument(
289        '--verbose',
290        '-v',
291        dest='verbose',
292        action='store_true',
293        help='Output additional logs as the script runs',
294    )
295    parser.add_argument(
296        '--chip',
297        dest='chip',
298        type=str,
299        required=True,
300        choices=[
301            'RP2040',
302            'RP2350',
303        ],
304        help='RP2 chip connected to a debug probe (RP2040 or RP2350)',
305    )
306
307    return parser
308
309
310def device_from_args(
311    args: argparse.Namespace, interactive: bool
312) -> PicoBoardInfo:
313    """Select a PicoBoardInfo using the provided `args`.
314
315    This function will exit if no compatible board is discovered.
316
317    Args:
318        args: The parsed args namespace. This must be a set of arguments parsed
319            using `create_flash_parser`.
320        interactive: If true, multiple detected boards will result in a user
321            interaction to select which to use. If false, the first compatible
322            board will be used.
323
324    Returns:
325        Selected PicoBoardInfo.
326    """
327    if args.pico_only and args.debug_probe_only:
328        _LOG.critical('Cannot specify both --pico-only and --debug-probe-only')
329        sys.exit(1)
330
331    # For now, require manual configurations to be fully specified.
332    if (args.usb_port is None) != (args.usb_bus is None):
333        _LOG.critical(
334            'Must specify BOTH --usb-bus and --usb-port when manually '
335            'specifying a device'
336        )
337        sys.exit(1)
338
339    if args.usb_bus and args.usb_port:
340        return device_detector.board_from_usb_port(args.usb_bus, args.usb_port)
341
342    _LOG.debug('Attempting to automatically detect dev board')
343    boards = device_detector.detect_boards(
344        include_picos=not args.debug_probe_only,
345        include_debug_probes=not args.pico_only,
346    )
347    if not boards:
348        _LOG.error('Could not find an attached device')
349        sys.exit(1)
350    if len(boards) == 1:
351        _LOG.info('Only one device detected.')
352        return boards[0]
353    if not interactive:
354        _LOG.info(
355            'Interactive mode disabled. Defaulting to first discovered device.'
356        )
357        return boards[0]
358
359    print('Multiple devices detected. Please select one:')
360    board_lines = list(
361        f'bus {board.bus}, port {board.port}'
362        f' ({board.manufacturer} - {board.product})'
363        for board in boards
364    )
365    user_input_index, _user_input_text = interactive_index_select(board_lines)
366    return boards[user_input_index]
367
368
369def main():
370    """Flash a binary."""
371    args = create_flash_parser().parse_args()
372    log_level = logging.DEBUG if args.verbose else logging.INFO
373    pw_cli.log.install(level=log_level)
374    board = device_from_args(args, interactive=True)
375    _LOG.info('Flashing bus %s port %s', board.bus, board.port)
376    flashed = flash(board, args.chip, args.binary)
377    sys.exit(0 if flashed else 1)
378
379
380if __name__ == '__main__':
381    main()
382