xref: /aosp_15_r20/external/pigweed/targets/rp2040/py/rp2040_utils/unit_test_server.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Launch a pw_target_runner server to use for multi-device testing."""
16
17import argparse
18import logging
19import sys
20import tempfile
21from typing import IO
22
23import pw_cli.process
24import pw_cli.log
25
26try:
27    from rp2040_utils import device_detector
28except ImportError:
29    # Load from this directory if rp2040_utils is not available.
30    import device_detector  # type: ignore
31
32_LOG = logging.getLogger('unit_test_server')
33
34DEFAULT_PORT = 34172
35
36# If the script is being run through Bazel, our runner and server are provided
37# at well known locations in the runfiles.
38try:
39    from python.runfiles import runfiles  # type: ignore
40
41    r = runfiles.Create()
42    _TEST_RUNNER_COMMAND = r.Rlocation(
43        'pigweed/targets/rp2040/py/rpc_unit_test_runner'
44    )
45    _TEST_SERVER_COMMAND = r.Rlocation(
46        'pigweed/pw_target_runner/go/cmd/server_/server'
47    )
48except ImportError:
49    _TEST_RUNNER_COMMAND = 'rp2040_unit_test_runner'
50    _TEST_SERVER_COMMAND = 'pw_target_runner_server'
51
52
53def parse_args():
54    """Parses command-line arguments."""
55
56    parser = argparse.ArgumentParser(description=__doc__)
57    parser.add_argument(
58        '--server-port',
59        type=int,
60        default=DEFAULT_PORT,
61        help='Port to launch the pw_target_runner_server on',
62    )
63    parser.add_argument(
64        '--server-config',
65        type=argparse.FileType('r'),
66        help='Path to server config file',
67    )
68    parser.add_argument(
69        '--debug-probe-only',
70        action='store_true',
71        help='Only run tests on detected Pi Pico debug probes',
72    )
73    parser.add_argument(
74        '--pico-only',
75        action='store_true',
76        help='Only run tests on detected Pi Pico boards (NOT debug probes)',
77    )
78    parser.add_argument(
79        '--verbose',
80        '-v',
81        dest='verbose',
82        action='store_true',
83        help='Output additional logs as the script runs',
84    )
85    parser.add_argument(
86        '--chip',
87        dest='chip',
88        type=str,
89        required=True,
90        choices=[
91            'RP2040',
92            'RP2350',
93        ],
94        help='RP2 chip connected to a debug probe (RP2040 or RP2350)',
95    )
96
97    return parser.parse_args()
98
99
100def generate_runner(command: str, arguments: list[str]) -> str:
101    """Generates a text-proto style pw_target_runner_server configuration."""
102    # TODO(amontanez): Use a real proto library to generate this when we have
103    # one set up.
104    for i, arg in enumerate(arguments):
105        arguments[i] = f'  args: "{arg}"'
106    runner = ['runner {', f'  command:"{command}"']
107    runner.extend(arguments)
108    runner.append('}\n')
109    return '\n'.join(runner)
110
111
112def generate_server_config(
113    chip: str, include_picos: bool = True, include_debug_probes: bool = True
114) -> IO[bytes]:
115    """Returns a temporary generated file for use as the server config."""
116    boards = device_detector.detect_boards(
117        include_picos=include_picos,
118        include_debug_probes=include_debug_probes,
119    )
120
121    if not boards:
122        _LOG.critical('No attached boards detected')
123        sys.exit(1)
124
125    if (
126        len({'b' if b.is_debug_probe() else 'p': True for b in boards}.keys())
127        > 1
128    ):
129        _LOG.critical(
130            'Debug probes and picos both detected. Mixed device configurations '
131            'are not supported. Please only connect Picos directly, or only '
132            'connect debug probes! You can also use --pico-only or '
133            '--debug-probe-only to filter attached devices.'
134        )
135        sys.exit(1)
136
137    config_file = tempfile.NamedTemporaryFile()
138    _LOG.debug('Generating test server config at %s', config_file.name)
139    _LOG.debug('Found %d attached devices', len(boards))
140
141    picotool_boards = [board for board in boards if not board.is_debug_probe()]
142    if len(picotool_boards) > 1:
143        # TODO: https://pwbug.dev/290245354 - Multi-device flashing doesn't work
144        # due to limitations of picotool. Limit to one device even if multiple
145        # are connected.
146        _LOG.warning(
147            'TODO: https://pwbug.dev/290245354 - Multiple non-debugprobe '
148            ' boards attached. '
149            'Disabling parallel testing.'
150        )
151        boards = boards[:1]
152
153    for board in boards:
154        test_runner_args = [
155            '--usb-bus',
156            str(board.bus),
157            '--usb-port',
158            str(board.port),
159            '--chip',
160            chip,
161        ]
162        config_file.write(
163            generate_runner(_TEST_RUNNER_COMMAND, test_runner_args).encode(
164                'utf-8'
165            )
166        )
167    config_file.flush()
168    return config_file
169
170
171def launch_server(
172    server_config: IO[bytes] | None,
173    server_port: int | None,
174    chip: str,
175    include_picos: bool,
176    include_debug_probes: bool,
177) -> int:
178    """Launch a device test server with the provided arguments."""
179    if server_config is None:
180        # Auto-detect attached boards if no config is provided.
181        server_config = generate_server_config(
182            chip, include_picos, include_debug_probes
183        )
184
185    cmd = [_TEST_SERVER_COMMAND, '-config', server_config.name]
186
187    if server_port is not None:
188        cmd.extend(['-port', str(server_port)])
189
190    return pw_cli.process.run(*cmd, log_output=True).returncode
191
192
193def main():
194    """Launch a device test server with the provided arguments."""
195    args = parse_args()
196
197    log_level = logging.DEBUG if args.verbose else logging.INFO
198    pw_cli.log.install(level=log_level)
199
200    if args.pico_only and args.debug_probe_only:
201        _LOG.critical('Cannot specify both --pico-only and --debug-probe-only')
202        sys.exit(1)
203
204    exit_code = launch_server(
205        args.server_config,
206        args.server_port,
207        args.chip,
208        not args.debug_probe_only,
209        not args.pico_only,
210    )
211    sys.exit(exit_code)
212
213
214if __name__ == '__main__':
215    main()
216