1#!/usr/bin/env python3 2# Copyright 2024 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Flashes binaries to attached Raspberry Pi Pico boards.""" 16 17import argparse 18import logging 19import os 20from pathlib import Path 21import subprocess 22import sys 23import time 24 25import pw_cli.log 26from pw_cli.interactive_prompts import interactive_index_select 27 28from rp2040_utils import device_detector 29from rp2040_utils.device_detector import PicoBoardInfo, PicoDebugProbeBoardInfo 30 31_LOG = logging.getLogger() 32 33# If the script is being run through Bazel, our support binaries are provided 34# at well known locations in its runfiles. 35try: 36 from python.runfiles import runfiles # type: ignore 37 38 r = runfiles.Create() 39 _PROBE_RS_COMMAND = r.Rlocation('probe_rs/probe-rs') 40 _PICOTOOL_COMMAND = r.Rlocation('picotool/picotool') 41except ImportError: 42 _PROBE_RS_COMMAND = 'probe-rs' 43 _PICOTOOL_COMMAND = 'picotool' 44 45 46def flash(board_info: PicoBoardInfo, chip: str, binary: Path) -> bool: 47 """Load `binary` onto `board_info` and wait for the device to become 48 available. 49 50 Returns whether or not flashing was successful.""" 51 if isinstance(board_info, PicoDebugProbeBoardInfo): 52 return _load_debugprobe_binary(board_info, chip, binary) 53 if not _load_picotool_binary(board_info, binary): 54 return False 55 if not _wait_for_serial_port(board_info): 56 _LOG.error( 57 'Binary flashed but unable to connect to the serial port: %s', 58 board_info.serial_port, 59 ) 60 return False 61 _LOG.info( 62 'Successfully flashed Pico on bus %s, port %s, serial port %s', 63 board_info.bus, 64 board_info.port, 65 board_info.serial_port, 66 ) 67 return True 68 69 70def find_elf(binary: Path) -> Path | None: 71 """Attempt to find and return the path to an ELF file for a binary. 72 73 Args: 74 binary: A relative path to a binary. 75 76 Returns the path to the associated ELF file, or None if none was found. 77 """ 78 if binary.suffix == '.elf' or not binary.suffix: 79 return binary 80 choices = ( 81 binary.parent / f'{binary.stem}.elf', 82 binary.parent / 'bin' / f'{binary.stem}.elf', 83 binary.parent / 'test' / f'{binary.stem}.elf', 84 ) 85 for choice in choices: 86 if choice.is_file(): 87 return choice 88 89 _LOG.error( 90 'Cannot find ELF file to use as a token database for binary: %s', 91 binary, 92 ) 93 return None 94 95 96def _load_picotool_binary(board_info: PicoBoardInfo, binary: Path) -> bool: 97 """Flash a binary to this device using picotool, returning success or 98 failure.""" 99 cmd = [ 100 _PICOTOOL_COMMAND, 101 'load', 102 '-x', 103 # We use the absolute path since `cwd` is changed below. 104 str(binary.absolute()), 105 ] 106 107 # If the binary has not file extension, assume that it is ELF and 108 # explicitly tell `picotool` that. 109 if not binary.suffix: 110 cmd += ['-t', 'elf'] 111 112 cmd += [ 113 '--bus', 114 str(board_info.bus), 115 '--address', 116 str(board_info.address()), 117 '-F', 118 ] 119 120 _LOG.debug('Flashing ==> %s', ' '.join(cmd)) 121 122 # If the script is running inside Bazel, `picotool` needs to run from 123 # the project root so that it can find libusb. 124 cwd = None 125 if 'BUILD_WORKING_DIRECTORY' in os.environ: 126 cwd = os.environ['BUILD_WORKING_DIRECTORY'] 127 128 process = subprocess.run( 129 cmd, 130 stdout=subprocess.PIPE, 131 stderr=subprocess.STDOUT, 132 cwd=cwd, 133 ) 134 if process.returncode: 135 err = ( 136 'Flashing command failed: ' + ' '.join(cmd), 137 str(board_info), 138 process.stdout.decode('utf-8', errors='replace'), 139 ) 140 _LOG.error('\n\n'.join(err)) 141 return False 142 return True 143 144 145def _wait_for_serial_port(board_info: PicoBoardInfo) -> bool: 146 """Waits for a serial port to enumerate.""" 147 start_time = time.monotonic() 148 timeout_seconds = 10.0 149 while time.monotonic() - start_time < timeout_seconds: 150 # If the serial port path isn't known, watch for a newly 151 # enumerated path. 152 if board_info.serial_port is None: 153 # Wait a bit before checking for a new port. 154 time.sleep(0.3) 155 # Check for updated serial port path. 156 for device in device_detector.detect_boards(): 157 if ( 158 device.bus == board_info.bus 159 and device.port == board_info.port 160 ): 161 board_info.serial_port = device.serial_port 162 # Serial port found, break out of device for loop. 163 break 164 165 # Serial port known try to connect to it. 166 if board_info.serial_port is not None: 167 # Connect to the serial port. 168 try: 169 with open(board_info.serial_port, 'r+b', buffering=0): 170 return True 171 except (OSError, IOError): 172 _LOG.debug( 173 'Unable to connect to %s, retrying', board_info.serial_port 174 ) 175 time.sleep(0.1) 176 _LOG.error( 177 'Binary flashed but unable to connect to the serial port: %s', 178 board_info.serial_port, 179 ) 180 return False 181 182 183def _load_debugprobe_binary( 184 board_info: PicoDebugProbeBoardInfo, chip: str, binary: Path 185) -> bool: 186 """Flash a binary to this device using a debug probe, returning success 187 or failure.""" 188 elf_path = find_elf(binary) 189 if not elf_path: 190 return False 191 192 # `probe-rs` takes a `--probe` argument of the form: 193 # <vendor_id>:<product_id>:<serial_number> 194 probe = "{:04x}:{:04x}:{}".format( 195 board_info.vendor_id(), 196 board_info.device_id(), 197 board_info.serial_number, 198 ) 199 200 download_cmd = ( 201 _PROBE_RS_COMMAND, 202 'download', 203 '--probe', 204 probe, 205 '--chip', 206 chip, 207 '--speed', 208 '10000', 209 str(elf_path), 210 ) 211 _LOG.debug('Flashing ==> %s', ' '.join(download_cmd)) 212 process = subprocess.run( 213 download_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT 214 ) 215 if process.returncode: 216 err = ( 217 'Flashing command failed: ' + ' '.join(download_cmd), 218 str(board_info), 219 process.stdout.decode('utf-8', errors='replace'), 220 ) 221 _LOG.error('\n\n'.join(err)) 222 return False 223 224 # `probe-rs download` leaves the device halted so it needs to be reset 225 # to run. 226 reset_cmd = ( 227 _PROBE_RS_COMMAND, 228 'reset', 229 '--probe', 230 probe, 231 '--chip', 232 chip, 233 ) 234 _LOG.debug('Resetting ==> %s', ' '.join(reset_cmd)) 235 process = subprocess.run( 236 reset_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT 237 ) 238 if process.returncode: 239 err = ( 240 'Resetting command failed: ' + ' '.join(reset_cmd), 241 str(board_info), 242 process.stdout.decode('utf-8', errors='replace'), 243 ) 244 _LOG.error('\n\n'.join(err)) 245 return False 246 247 # Give time for the device to reset. Ideally the common unit test 248 # runner would wait for input but this is not the case. 249 time.sleep(0.5) 250 251 return True 252 253 254def create_flash_parser() -> argparse.ArgumentParser: 255 """Returns a parser for flashing command-line arguments.""" 256 parser = argparse.ArgumentParser(description=__doc__) 257 parser.add_argument('binary', type=Path, help='The target binary to flash') 258 parser.add_argument( 259 '--usb-bus', 260 type=int, 261 help='The bus this Pi Pico is on', 262 ) 263 parser.add_argument( 264 '--usb-port', 265 type=str, 266 help=( 267 'The port chain as a colon-separated list of integers of this Pi ' 268 'Pico on the specified USB bus (e.g. 1:4:2:2)' 269 ), 270 ) 271 parser.add_argument( 272 '-b', 273 '--baud', 274 type=int, 275 default=115200, 276 help='Baud rate to use for serial communication with target device', 277 ) 278 parser.add_argument( 279 '--debug-probe-only', 280 action='store_true', 281 help='Only flash on detected Pi Pico debug probes', 282 ) 283 parser.add_argument( 284 '--pico-only', 285 action='store_true', 286 help='Only flash on detected Pi Pico boards', 287 ) 288 parser.add_argument( 289 '--verbose', 290 '-v', 291 dest='verbose', 292 action='store_true', 293 help='Output additional logs as the script runs', 294 ) 295 parser.add_argument( 296 '--chip', 297 dest='chip', 298 type=str, 299 required=True, 300 choices=[ 301 'RP2040', 302 'RP2350', 303 ], 304 help='RP2 chip connected to a debug probe (RP2040 or RP2350)', 305 ) 306 307 return parser 308 309 310def device_from_args( 311 args: argparse.Namespace, interactive: bool 312) -> PicoBoardInfo: 313 """Select a PicoBoardInfo using the provided `args`. 314 315 This function will exit if no compatible board is discovered. 316 317 Args: 318 args: The parsed args namespace. This must be a set of arguments parsed 319 using `create_flash_parser`. 320 interactive: If true, multiple detected boards will result in a user 321 interaction to select which to use. If false, the first compatible 322 board will be used. 323 324 Returns: 325 Selected PicoBoardInfo. 326 """ 327 if args.pico_only and args.debug_probe_only: 328 _LOG.critical('Cannot specify both --pico-only and --debug-probe-only') 329 sys.exit(1) 330 331 # For now, require manual configurations to be fully specified. 332 if (args.usb_port is None) != (args.usb_bus is None): 333 _LOG.critical( 334 'Must specify BOTH --usb-bus and --usb-port when manually ' 335 'specifying a device' 336 ) 337 sys.exit(1) 338 339 if args.usb_bus and args.usb_port: 340 return device_detector.board_from_usb_port(args.usb_bus, args.usb_port) 341 342 _LOG.debug('Attempting to automatically detect dev board') 343 boards = device_detector.detect_boards( 344 include_picos=not args.debug_probe_only, 345 include_debug_probes=not args.pico_only, 346 ) 347 if not boards: 348 _LOG.error('Could not find an attached device') 349 sys.exit(1) 350 if len(boards) == 1: 351 _LOG.info('Only one device detected.') 352 return boards[0] 353 if not interactive: 354 _LOG.info( 355 'Interactive mode disabled. Defaulting to first discovered device.' 356 ) 357 return boards[0] 358 359 print('Multiple devices detected. Please select one:') 360 board_lines = list( 361 f'bus {board.bus}, port {board.port}' 362 f' ({board.manufacturer} - {board.product})' 363 for board in boards 364 ) 365 user_input_index, _user_input_text = interactive_index_select(board_lines) 366 return boards[user_input_index] 367 368 369def main(): 370 """Flash a binary.""" 371 args = create_flash_parser().parse_args() 372 log_level = logging.DEBUG if args.verbose else logging.INFO 373 pw_cli.log.install(level=log_level) 374 board = device_from_args(args, interactive=True) 375 _LOG.info('Flashing bus %s port %s', board.bus, board.port) 376 flashed = flash(board, args.chip, args.binary) 377 sys.exit(0 if flashed else 1) 378 379 380if __name__ == '__main__': 381 main() 382