1#!/usr/bin/env python3 2# Copyright 2019 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""This script flashes and runs unit tests on stm32f429i-disc1 boards.""" 16 17import argparse 18import logging 19import os 20import subprocess 21import sys 22import threading 23 24import coloredlogs # type: ignore 25import serial 26from stm32f429i_disc1_utils import stm32f429i_detector 27 28# Path used to access non-python resources in this python module. 29_DIR = os.path.dirname(__file__) 30 31# Path to default openocd configuration file. 32_OPENOCD_CONFIG = os.path.join(_DIR, 'openocd_stm32f4xx.cfg') 33 34# Path to scripts provided by openocd. 35_OPENOCD_SCRIPTS_DIR = os.path.join( 36 os.getenv('PW_PIGWEED_CIPD_INSTALL_DIR', ''), 'share', 'openocd', 'scripts' 37) 38 39_LOG = logging.getLogger('unit_test_runner') 40 41# Verification of test pass/failure depends on these strings. If the formatting 42# or output of the simple_printing_event_handler changes, this may need to be 43# updated. 44_TESTS_STARTING_STRING = b'[==========] Running all tests.' 45_TESTS_DONE_STRING = b'[==========] Done running all tests.' 46_TEST_FAILURE_STRING = b'[ FAILED ]' 47 48# How long to wait for the first byte of a test to be emitted. This is longer 49# than the user-configurable timeout as there's a delay while the device is 50# flashed. 51_FLASH_TIMEOUT = 5.0 52 53 54class TestingFailure(Exception): 55 """A simple exception to be raised when a testing step fails.""" 56 57 58class DeviceNotFound(Exception): 59 """A simple exception to be raised when unable to connect to a device.""" 60 61 62def parse_args(): 63 """Parses command-line arguments.""" 64 65 parser = argparse.ArgumentParser(description=__doc__) 66 parser.add_argument('binary', help='The target test binary to run') 67 parser.add_argument( 68 '--openocd-config', 69 default=_OPENOCD_CONFIG, 70 help='Path to openocd configuration file', 71 ) 72 parser.add_argument( 73 '--stlink-serial', 74 default=None, 75 help='The serial number of the stlink to use when ' 76 'flashing the target device', 77 ) 78 parser.add_argument( 79 '--port', 80 default=None, 81 help='The name of the serial port to connect to when ' 'running tests', 82 ) 83 parser.add_argument( 84 '--baud', 85 type=int, 86 default=115200, 87 help='Target baud rate to use for serial communication' 88 ' with target device', 89 ) 90 parser.add_argument( 91 '--test-timeout', 92 type=float, 93 default=5.0, 94 help='Maximum communication delay in seconds before a ' 95 'test is considered unresponsive and aborted', 96 ) 97 parser.add_argument( 98 '--verbose', 99 '-v', 100 dest='verbose', 101 action="store_true", 102 help='Output additional logs as the script runs', 103 ) 104 105 return parser.parse_args() 106 107 108def log_subprocess_output(level, output): 109 """Logs subprocess output line-by-line.""" 110 111 lines = output.decode('utf-8', errors='replace').splitlines() 112 for line in lines: 113 _LOG.log(level, line) 114 115 116def reset_device(openocd_config, stlink_serial): 117 """Uses openocd to reset the attached device.""" 118 119 # Name/path of openocd. 120 default_flasher = 'openocd' 121 flash_tool = os.getenv('OPENOCD_PATH', default_flasher) 122 123 cmd = [ 124 flash_tool, 125 '-s', 126 _OPENOCD_SCRIPTS_DIR, 127 '-f', 128 openocd_config, 129 '-c', 130 'init', 131 '-c', 132 'reset run', 133 '-c', 134 'exit', 135 ] 136 _LOG.debug('Resetting device') 137 138 env = os.environ.copy() 139 if stlink_serial: 140 env['PW_STLINK_SERIAL'] = stlink_serial 141 142 # Disable GDB port to support multi-device testing. 143 env['PW_GDB_PORT'] = 'disabled' 144 process = subprocess.run( 145 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env 146 ) 147 if process.returncode: 148 log_subprocess_output(logging.ERROR, process.stdout) 149 raise TestingFailure('Failed to reset target device') 150 151 log_subprocess_output(logging.DEBUG, process.stdout) 152 153 _LOG.debug('Successfully reset device') 154 155 156def read_serial(port, baud_rate, test_timeout) -> bytes: 157 """Reads lines from a serial port until a line read times out. 158 159 Returns bytes object containing the read serial data. 160 """ 161 162 serial_data = bytearray() 163 device = serial.Serial( 164 baudrate=baud_rate, port=port, timeout=_FLASH_TIMEOUT 165 ) 166 if not device.is_open: 167 raise TestingFailure('Failed to open device') 168 169 # Flush input buffer and reset the device to begin the test. 170 device.reset_input_buffer() 171 172 # Block and wait for the first byte. 173 serial_data += device.read() 174 if not serial_data: 175 raise TestingFailure('Device not producing output') 176 177 device.timeout = test_timeout 178 179 # Read with a reasonable timeout until we stop getting characters. 180 while True: 181 bytes_read = device.readline() 182 if not bytes_read: 183 break 184 serial_data += bytes_read 185 if serial_data.rfind(_TESTS_DONE_STRING) != -1: 186 # Set to much more aggressive timeout since the last one or two 187 # lines should print out immediately. (one line if all fails or all 188 # passes, two lines if mixed.) 189 device.timeout = 0.01 190 191 # Remove carriage returns. 192 serial_data = serial_data.replace(b'\r', b'') 193 194 # Try to trim captured results to only contain most recent test run. 195 test_start_index = serial_data.rfind(_TESTS_STARTING_STRING) 196 return ( 197 serial_data 198 if test_start_index == -1 199 else serial_data[test_start_index:] 200 ) 201 202 203def flash_device(binary, openocd_config, stlink_serial): 204 """Flash binary to a connected device using the provided configuration.""" 205 206 # Name/path of openocd. 207 default_flasher = 'openocd' 208 flash_tool = os.getenv('OPENOCD_PATH', default_flasher) 209 210 openocd_command = ' '.join(['program', binary, 'reset', 'exit']) 211 cmd = [ 212 flash_tool, 213 '-s', 214 _OPENOCD_SCRIPTS_DIR, 215 '-f', 216 openocd_config, 217 '-c', 218 openocd_command, 219 ] 220 _LOG.info('Flashing firmware to device') 221 222 env = os.environ.copy() 223 if stlink_serial: 224 env['PW_STLINK_SERIAL'] = stlink_serial 225 226 # Disable GDB port to support multi-device testing. 227 env['PW_GDB_PORT'] = 'disabled' 228 process = subprocess.run( 229 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env 230 ) 231 if process.returncode: 232 log_subprocess_output(logging.ERROR, process.stdout) 233 raise TestingFailure('Failed to flash target device') 234 235 log_subprocess_output(logging.DEBUG, process.stdout) 236 237 _LOG.debug('Successfully flashed firmware to device') 238 239 240def handle_test_results(test_output): 241 """Parses test output to determine whether tests passed or failed.""" 242 243 if test_output.find(_TESTS_STARTING_STRING) == -1: 244 raise TestingFailure('Failed to find test start') 245 246 if test_output.rfind(_TESTS_DONE_STRING) == -1: 247 log_subprocess_output(logging.INFO, test_output) 248 raise TestingFailure('Tests did not complete') 249 250 if test_output.rfind(_TEST_FAILURE_STRING) != -1: 251 log_subprocess_output(logging.INFO, test_output) 252 raise TestingFailure('Test suite had one or more failures') 253 254 log_subprocess_output(logging.DEBUG, test_output) 255 256 _LOG.info('Test passed!') 257 258 259def _threaded_test_reader(dest, port, baud_rate, test_timeout): 260 """Parses test output to the mutable "dest" passed to this function.""" 261 dest.append(read_serial(port, baud_rate, test_timeout)) 262 263 264def run_device_test( 265 binary, test_timeout, openocd_config, baud, stlink_serial=None, port=None 266) -> bool: 267 """Flashes, runs, and checks an on-device test binary. 268 269 Returns true on test pass. 270 """ 271 272 if stlink_serial is None and port is None: 273 _LOG.debug('Attempting to automatically detect dev board') 274 boards = stm32f429i_detector.detect_boards() 275 if not boards: 276 error = 'Could not find an attached device' 277 _LOG.error(error) 278 raise DeviceNotFound(error) 279 stlink_serial = boards[0].serial_number 280 port = boards[0].dev_name 281 282 _LOG.debug('Launching test binary %s', binary) 283 try: 284 # Begin capturing test output via another thread BEFORE flashing the 285 # device since the test will automatically run after the image is 286 # flashed. This reduces flake since there isn't a need to time a reset 287 # correctly relative to the start of capturing device output. 288 result: list[bytes] = [] 289 threaded_reader_args = (result, port, baud, test_timeout) 290 read_thread = threading.Thread( 291 target=_threaded_test_reader, args=threaded_reader_args 292 ) 293 read_thread.start() 294 _LOG.info('Running test') 295 flash_device(binary, openocd_config, stlink_serial) 296 read_thread.join() 297 if result: 298 handle_test_results(result[0]) 299 except TestingFailure as err: 300 _LOG.error(err) 301 return False 302 303 return True 304 305 306def main(): 307 """Set up runner, and then flash/run device test.""" 308 args = parse_args() 309 310 # Try to use pw_cli logs, else default to something reasonable. 311 try: 312 import pw_cli.log # pylint: disable=import-outside-toplevel 313 314 log_level = logging.DEBUG if args.verbose else logging.INFO 315 pw_cli.log.install(level=log_level) 316 except ImportError: 317 coloredlogs.install( 318 level='DEBUG' if args.verbose else 'INFO', 319 level_styles={'debug': {'color': 244}, 'error': {'color': 'red'}}, 320 fmt='%(asctime)s %(levelname)s | %(message)s', 321 ) 322 323 if run_device_test( 324 args.binary, 325 args.test_timeout, 326 args.openocd_config, 327 args.baud, 328 args.stlink_serial, 329 args.port, 330 ): 331 sys.exit(0) 332 else: 333 sys.exit(1) 334 335 336if __name__ == '__main__': 337 main() 338