xref: /aosp_15_r20/external/pigweed/targets/stm32f429i_disc1/py/stm32f429i_disc1_utils/unit_test_runner.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2019 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""This script flashes and runs unit tests on stm32f429i-disc1 boards."""
16
17import argparse
18import logging
19import os
20import subprocess
21import sys
22import threading
23
24import coloredlogs  # type: ignore
25import serial
26from stm32f429i_disc1_utils import stm32f429i_detector
27
28# Path used to access non-python resources in this python module.
29_DIR = os.path.dirname(__file__)
30
31# Path to default openocd configuration file.
32_OPENOCD_CONFIG = os.path.join(_DIR, 'openocd_stm32f4xx.cfg')
33
34# Path to scripts provided by openocd.
35_OPENOCD_SCRIPTS_DIR = os.path.join(
36    os.getenv('PW_PIGWEED_CIPD_INSTALL_DIR', ''), 'share', 'openocd', 'scripts'
37)
38
39_LOG = logging.getLogger('unit_test_runner')
40
41# Verification of test pass/failure depends on these strings. If the formatting
42# or output of the simple_printing_event_handler changes, this may need to be
43# updated.
44_TESTS_STARTING_STRING = b'[==========] Running all tests.'
45_TESTS_DONE_STRING = b'[==========] Done running all tests.'
46_TEST_FAILURE_STRING = b'[  FAILED  ]'
47
48# How long to wait for the first byte of a test to be emitted. This is longer
49# than the user-configurable timeout as there's a delay while the device is
50# flashed.
51_FLASH_TIMEOUT = 5.0
52
53
54class TestingFailure(Exception):
55    """A simple exception to be raised when a testing step fails."""
56
57
58class DeviceNotFound(Exception):
59    """A simple exception to be raised when unable to connect to a device."""
60
61
62def parse_args():
63    """Parses command-line arguments."""
64
65    parser = argparse.ArgumentParser(description=__doc__)
66    parser.add_argument('binary', help='The target test binary to run')
67    parser.add_argument(
68        '--openocd-config',
69        default=_OPENOCD_CONFIG,
70        help='Path to openocd configuration file',
71    )
72    parser.add_argument(
73        '--stlink-serial',
74        default=None,
75        help='The serial number of the stlink to use when '
76        'flashing the target device',
77    )
78    parser.add_argument(
79        '--port',
80        default=None,
81        help='The name of the serial port to connect to when ' 'running tests',
82    )
83    parser.add_argument(
84        '--baud',
85        type=int,
86        default=115200,
87        help='Target baud rate to use for serial communication'
88        ' with target device',
89    )
90    parser.add_argument(
91        '--test-timeout',
92        type=float,
93        default=5.0,
94        help='Maximum communication delay in seconds before a '
95        'test is considered unresponsive and aborted',
96    )
97    parser.add_argument(
98        '--verbose',
99        '-v',
100        dest='verbose',
101        action="store_true",
102        help='Output additional logs as the script runs',
103    )
104
105    return parser.parse_args()
106
107
108def log_subprocess_output(level, output):
109    """Logs subprocess output line-by-line."""
110
111    lines = output.decode('utf-8', errors='replace').splitlines()
112    for line in lines:
113        _LOG.log(level, line)
114
115
116def reset_device(openocd_config, stlink_serial):
117    """Uses openocd to reset the attached device."""
118
119    # Name/path of openocd.
120    default_flasher = 'openocd'
121    flash_tool = os.getenv('OPENOCD_PATH', default_flasher)
122
123    cmd = [
124        flash_tool,
125        '-s',
126        _OPENOCD_SCRIPTS_DIR,
127        '-f',
128        openocd_config,
129        '-c',
130        'init',
131        '-c',
132        'reset run',
133        '-c',
134        'exit',
135    ]
136    _LOG.debug('Resetting device')
137
138    env = os.environ.copy()
139    if stlink_serial:
140        env['PW_STLINK_SERIAL'] = stlink_serial
141
142    # Disable GDB port to support multi-device testing.
143    env['PW_GDB_PORT'] = 'disabled'
144    process = subprocess.run(
145        cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env
146    )
147    if process.returncode:
148        log_subprocess_output(logging.ERROR, process.stdout)
149        raise TestingFailure('Failed to reset target device')
150
151    log_subprocess_output(logging.DEBUG, process.stdout)
152
153    _LOG.debug('Successfully reset device')
154
155
156def read_serial(port, baud_rate, test_timeout) -> bytes:
157    """Reads lines from a serial port until a line read times out.
158
159    Returns bytes object containing the read serial data.
160    """
161
162    serial_data = bytearray()
163    device = serial.Serial(
164        baudrate=baud_rate, port=port, timeout=_FLASH_TIMEOUT
165    )
166    if not device.is_open:
167        raise TestingFailure('Failed to open device')
168
169    # Flush input buffer and reset the device to begin the test.
170    device.reset_input_buffer()
171
172    # Block and wait for the first byte.
173    serial_data += device.read()
174    if not serial_data:
175        raise TestingFailure('Device not producing output')
176
177    device.timeout = test_timeout
178
179    # Read with a reasonable timeout until we stop getting characters.
180    while True:
181        bytes_read = device.readline()
182        if not bytes_read:
183            break
184        serial_data += bytes_read
185        if serial_data.rfind(_TESTS_DONE_STRING) != -1:
186            # Set to much more aggressive timeout since the last one or two
187            # lines should print out immediately. (one line if all fails or all
188            # passes, two lines if mixed.)
189            device.timeout = 0.01
190
191    # Remove carriage returns.
192    serial_data = serial_data.replace(b'\r', b'')
193
194    # Try to trim captured results to only contain most recent test run.
195    test_start_index = serial_data.rfind(_TESTS_STARTING_STRING)
196    return (
197        serial_data
198        if test_start_index == -1
199        else serial_data[test_start_index:]
200    )
201
202
203def flash_device(binary, openocd_config, stlink_serial):
204    """Flash binary to a connected device using the provided configuration."""
205
206    # Name/path of openocd.
207    default_flasher = 'openocd'
208    flash_tool = os.getenv('OPENOCD_PATH', default_flasher)
209
210    openocd_command = ' '.join(['program', binary, 'reset', 'exit'])
211    cmd = [
212        flash_tool,
213        '-s',
214        _OPENOCD_SCRIPTS_DIR,
215        '-f',
216        openocd_config,
217        '-c',
218        openocd_command,
219    ]
220    _LOG.info('Flashing firmware to device')
221
222    env = os.environ.copy()
223    if stlink_serial:
224        env['PW_STLINK_SERIAL'] = stlink_serial
225
226    # Disable GDB port to support multi-device testing.
227    env['PW_GDB_PORT'] = 'disabled'
228    process = subprocess.run(
229        cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env
230    )
231    if process.returncode:
232        log_subprocess_output(logging.ERROR, process.stdout)
233        raise TestingFailure('Failed to flash target device')
234
235    log_subprocess_output(logging.DEBUG, process.stdout)
236
237    _LOG.debug('Successfully flashed firmware to device')
238
239
240def handle_test_results(test_output):
241    """Parses test output to determine whether tests passed or failed."""
242
243    if test_output.find(_TESTS_STARTING_STRING) == -1:
244        raise TestingFailure('Failed to find test start')
245
246    if test_output.rfind(_TESTS_DONE_STRING) == -1:
247        log_subprocess_output(logging.INFO, test_output)
248        raise TestingFailure('Tests did not complete')
249
250    if test_output.rfind(_TEST_FAILURE_STRING) != -1:
251        log_subprocess_output(logging.INFO, test_output)
252        raise TestingFailure('Test suite had one or more failures')
253
254    log_subprocess_output(logging.DEBUG, test_output)
255
256    _LOG.info('Test passed!')
257
258
259def _threaded_test_reader(dest, port, baud_rate, test_timeout):
260    """Parses test output to the mutable "dest" passed to this function."""
261    dest.append(read_serial(port, baud_rate, test_timeout))
262
263
264def run_device_test(
265    binary, test_timeout, openocd_config, baud, stlink_serial=None, port=None
266) -> bool:
267    """Flashes, runs, and checks an on-device test binary.
268
269    Returns true on test pass.
270    """
271
272    if stlink_serial is None and port is None:
273        _LOG.debug('Attempting to automatically detect dev board')
274        boards = stm32f429i_detector.detect_boards()
275        if not boards:
276            error = 'Could not find an attached device'
277            _LOG.error(error)
278            raise DeviceNotFound(error)
279        stlink_serial = boards[0].serial_number
280        port = boards[0].dev_name
281
282    _LOG.debug('Launching test binary %s', binary)
283    try:
284        # Begin capturing test output via another thread BEFORE flashing the
285        # device since the test will automatically run after the image is
286        # flashed. This reduces flake since there isn't a need to time a reset
287        # correctly relative to the start of capturing device output.
288        result: list[bytes] = []
289        threaded_reader_args = (result, port, baud, test_timeout)
290        read_thread = threading.Thread(
291            target=_threaded_test_reader, args=threaded_reader_args
292        )
293        read_thread.start()
294        _LOG.info('Running test')
295        flash_device(binary, openocd_config, stlink_serial)
296        read_thread.join()
297        if result:
298            handle_test_results(result[0])
299    except TestingFailure as err:
300        _LOG.error(err)
301        return False
302
303    return True
304
305
306def main():
307    """Set up runner, and then flash/run device test."""
308    args = parse_args()
309
310    # Try to use pw_cli logs, else default to something reasonable.
311    try:
312        import pw_cli.log  # pylint: disable=import-outside-toplevel
313
314        log_level = logging.DEBUG if args.verbose else logging.INFO
315        pw_cli.log.install(level=log_level)
316    except ImportError:
317        coloredlogs.install(
318            level='DEBUG' if args.verbose else 'INFO',
319            level_styles={'debug': {'color': 244}, 'error': {'color': 'red'}},
320            fmt='%(asctime)s %(levelname)s | %(message)s',
321        )
322
323    if run_device_test(
324        args.binary,
325        args.test_timeout,
326        args.openocd_config,
327        args.baud,
328        args.stlink_serial,
329        args.port,
330    ):
331        sys.exit(0)
332    else:
333        sys.exit(1)
334
335
336if __name__ == '__main__':
337    main()
338