1# Copyright 2019 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Runs Pigweed unit tests built using GN.""" 15 16from __future__ import annotations 17 18import argparse 19import asyncio 20import base64 21import datetime 22import enum 23import json 24import logging 25import os 26import re 27import subprocess 28import sys 29import time 30 31from pathlib import Path 32from typing import Iterable, Sequence, Set 33 34import requests 35 36import pw_cli.log 37import pw_cli.process 38 39# Global logger for the script. 40_LOG: logging.Logger = logging.getLogger(__name__) 41 42_ANSI_SEQUENCE_REGEX = re.compile(rb'\x1b[^m]*m') 43 44 45def _strip_ansi(bytes_with_sequences: bytes) -> bytes: 46 """Strip out ANSI escape sequences.""" 47 return _ANSI_SEQUENCE_REGEX.sub(b'', bytes_with_sequences) 48 49 50def register_arguments(parser: argparse.ArgumentParser) -> None: 51 """Registers command-line arguments.""" 52 53 parser.add_argument( 54 '--root', 55 type=str, 56 default='out', 57 help='Path to the root build directory', 58 ) 59 parser.add_argument( 60 '-r', 61 '--runner', 62 type=str, 63 required=True, 64 help='Executable which runs a test on the target', 65 ) 66 parser.add_argument( 67 '-m', '--timeout', type=float, help='Timeout for test runner in seconds' 68 ) 69 parser.add_argument( 70 '-e', 71 '--env', 72 nargs='*', 73 help='Environment variables to set for the test. These should be of the' 74 ' form `var_name=value`.', 75 ) 76 77 parser.add_argument( 78 'runner_args', nargs="*", help='Arguments to forward to the test runner' 79 ) 80 81 # The runner script can either run binaries directly or groups. 82 group = parser.add_mutually_exclusive_group() 83 group.add_argument( 84 '-g', '--group', action='append', help='Test groups to run' 85 ) 86 group.add_argument( 87 '-t', '--test', action='append', help='Test binaries to run' 88 ) 89 90 91class TestResult(enum.Enum): 92 """Result of a single unit test run.""" 93 94 UNKNOWN = 0 95 SUCCESS = 1 96 FAILURE = 2 97 98 99class Test: 100 """A unit test executable.""" 101 102 def __init__(self, name: str, file_path: str) -> None: 103 self.name: str = name 104 self.file_path: str = file_path 105 self.status: TestResult = TestResult.UNKNOWN 106 self.start_time: datetime.datetime 107 self.duration_s: float 108 109 def __repr__(self) -> str: 110 return f'Test({self.name})' 111 112 def __eq__(self, other: object) -> bool: 113 if not isinstance(other, Test): 114 return NotImplemented 115 return self.file_path == other.file_path 116 117 def __hash__(self) -> int: 118 return hash(self.file_path) 119 120 121class TestGroup: 122 """Graph node representing a group of unit tests.""" 123 124 def __init__(self, name: str, tests: Iterable[Test]): 125 self._name: str = name 126 self._deps: Iterable['TestGroup'] = [] 127 self._tests: Iterable[Test] = tests 128 129 def set_deps(self, deps: Iterable['TestGroup']) -> None: 130 """Updates the dependency list of this group.""" 131 self._deps = deps 132 133 def all_test_dependencies(self) -> list[Test]: 134 """Returns a list of all tests in this group and its dependencies.""" 135 return list(self._all_test_dependencies(set())) 136 137 def _all_test_dependencies(self, processed_groups: Set[str]) -> Set[Test]: 138 if self._name in processed_groups: 139 return set() 140 141 tests: Set[Test] = set() 142 for dep in self._deps: 143 tests.update( 144 dep._all_test_dependencies( # pylint: disable=protected-access 145 processed_groups 146 ) 147 ) 148 149 tests.update(self._tests) 150 processed_groups.add(self._name) 151 152 return tests 153 154 def __repr__(self) -> str: 155 return f'TestGroup({self._name})' 156 157 158class TestRunner: 159 """Runs unit tests by calling out to a runner script.""" 160 161 def __init__( 162 self, 163 executable: str, 164 args: Sequence[str], 165 tests: Iterable[Test], 166 env: dict[str, str] | None = None, 167 timeout: float | None = None, 168 verbose: bool = False, 169 ) -> None: 170 self._executable: str = executable 171 self._args: Sequence[str] = args 172 self._tests: list[Test] = list(tests) 173 self._env: dict[str, str] = env or {} 174 self._timeout = timeout 175 self._result_sink: dict[str, str] | None = None 176 self.verbose = verbose 177 178 # Access go/result-sink, if available. 179 ctx_path = Path(os.environ.get("LUCI_CONTEXT", '')) 180 if not ctx_path.is_file(): 181 return 182 183 ctx = json.loads(ctx_path.read_text(encoding='utf-8')) 184 self._result_sink = ctx.get('result_sink', None) 185 186 async def run_tests(self) -> None: 187 """Runs all registered unit tests through the runner script.""" 188 189 for idx, test in enumerate(self._tests, 1): 190 total = str(len(self._tests)) 191 test_counter = f'Test {idx:{len(total)}}/{total}' 192 193 _LOG.debug('%s: [ RUN] %s', test_counter, test.name) 194 195 # Convert POSIX to native directory seperators as GN produces '/' 196 # but the Windows test runner needs '\\'. 197 command = [ 198 str(Path(self._executable)), 199 *self._args, 200 str(Path(test.file_path)), 201 ] 202 203 if self._executable.endswith('.py'): 204 command.insert(0, sys.executable) 205 206 test.start_time = datetime.datetime.now(datetime.timezone.utc) 207 start_time = time.monotonic() 208 try: 209 process = await pw_cli.process.run_async( 210 *command, 211 env=self._env, 212 timeout=self._timeout, 213 log_output=self.verbose, 214 ) 215 except subprocess.CalledProcessError as err: 216 _LOG.error(err) 217 return 218 test.duration_s = time.monotonic() - start_time 219 220 if process.returncode == 0: 221 test.status = TestResult.SUCCESS 222 test_result = 'PASS' 223 else: 224 test.status = TestResult.FAILURE 225 test_result = 'FAIL' 226 227 _LOG.log( 228 pw_cli.log.LOGLEVEL_STDOUT, 229 '[Pid: %s]\n%s', 230 pw_cli.color.colors().bold_white(process.pid), 231 process.output.decode(errors='ignore').rstrip(), 232 ) 233 234 _LOG.info( 235 '%s: [%s] %s in %.3f s', 236 test_counter, 237 test_result, 238 test.name, 239 test.duration_s, 240 ) 241 242 try: 243 self._maybe_upload_to_resultdb(test, process) 244 except requests.exceptions.HTTPError as err: 245 _LOG.error(err) 246 return 247 248 def all_passed(self) -> bool: 249 """Returns true if all unit tests passed.""" 250 return all(test.status is TestResult.SUCCESS for test in self._tests) 251 252 def _maybe_upload_to_resultdb( 253 self, test: Test, process: pw_cli.process.CompletedProcess 254 ): 255 """Uploads test result to ResultDB, if available.""" 256 if self._result_sink is None: 257 # ResultDB integration not enabled. 258 return 259 260 test_result = { 261 # The test.name is not suitable as an identifier because it's just 262 # the basename of the test (channel_test). We want the full path, 263 # including the toolchain used. 264 "testId": test.file_path, 265 # ResultDB also supports CRASH and ABORT, but there's currently no 266 # way to distinguish these in pw_unit_test. 267 "status": "PASS" if test.status is TestResult.SUCCESS else "FAIL", 268 # The "expected" field is required. It could be used to report 269 # expected failures, but we don't currently support these in 270 # pw_unit_test. 271 "expected": test.status is TestResult.SUCCESS, 272 # Ensure to format the duration with '%.9fs' to avoid scientific 273 # notation. If a value is too large or small and formatted with 274 # str() or '%s', python formats the value in scientific notation, 275 # like '1.1e-10', which is an invalid input for 276 # google.protobuf.duration. 277 "duration": "%.9fs" % test.duration_s, 278 "start_time": test.start_time.isoformat(), 279 "testMetadata": { 280 # Use the file path as the test name in the Milo UI. (If this is 281 # left unspecified, the UI will attempt to build a "good enough" 282 # name by truncating the testId. That produces less readable 283 # results.) 284 "name": test.file_path, 285 }, 286 "summaryHtml": ( 287 '<p><text-artifact ' 288 'artifact-id="artifact-content-in-request"></p>' 289 ), 290 "artifacts": { 291 "artifact-content-in-request": { 292 # Need to decode the bytes back to ASCII or they will not be 293 # encodable by json.dumps. 294 # 295 # TODO: b/248349219 - Instead of stripping the ANSI color 296 # codes, convert them to HTML. 297 "contents": base64.b64encode( 298 _strip_ansi(process.output) 299 ).decode('ascii'), 300 }, 301 }, 302 } 303 304 requests.post( 305 url='http://%s/prpc/luci.resultsink.v1.Sink/ReportTestResults' 306 % self._result_sink['address'], 307 headers={ 308 'Content-Type': 'application/json', 309 'Accept': 'application/json', 310 'Authorization': 'ResultSink %s' 311 % self._result_sink['auth_token'], 312 }, 313 data=json.dumps({'testResults': [test_result]}), 314 timeout=5.0, 315 ).raise_for_status() 316 317 318# Filename extension for unit test metadata files. 319METADATA_EXTENSION = '.testinfo.json' 320 321 322def find_test_metadata(root: str) -> list[str]: 323 """Locates all test metadata files located within a directory tree.""" 324 325 metadata: list[str] = [] 326 for path, _, files in os.walk(root): 327 for filename in files: 328 if not filename.endswith(METADATA_EXTENSION): 329 continue 330 331 full_path = os.path.join(path, filename) 332 _LOG.debug('Found group metadata at %s', full_path) 333 metadata.append(full_path) 334 335 return metadata 336 337 338# TODO(frolv): This is copied from the Python runner script. 339# It should be extracted into a library and imported instead. 340def find_binary(target: str) -> str: 341 """Tries to find a binary for a gn build target. 342 343 Args: 344 target: Relative filesystem path to the target's output directory and 345 target name, separated by a colon. 346 347 Returns: 348 Full path to the target's binary. 349 350 Raises: 351 FileNotFoundError: No binary found for target. 352 """ 353 354 target_path, target_name = target.split(':') 355 356 for extension in ['', '.elf', '.exe']: 357 potential_filename = f'{target_path}/{target_name}{extension}' 358 if os.path.isfile(potential_filename): 359 return potential_filename 360 361 raise FileNotFoundError( 362 f'Could not find output binary for build target {target}' 363 ) 364 365 366def parse_metadata(metadata: list[str], root: str) -> dict[str, TestGroup]: 367 """Builds a graph of test group objects from metadata. 368 369 Args: 370 metadata: List of paths to JSON test metadata files. 371 root: Root output directory of the build. 372 373 Returns: 374 Map of group name to TestGroup object. All TestGroup objects are fully 375 populated with the paths to their unit tests and references to their 376 dependencies. 377 """ 378 379 def canonicalize(path: str) -> str: 380 """Removes a trailing slash from a GN target's directory. 381 382 '//module:target' -> //module:target 383 '//module/:target' -> //module:target 384 """ 385 index = path.find(':') 386 if index == -1 or path[index - 1] != '/': 387 return path 388 return path[: index - 1] + path[index:] 389 390 group_deps: list[tuple[str, list[str]]] = [] 391 all_tests: dict[str, Test] = {} 392 test_groups: dict[str, TestGroup] = {} 393 num_tests = 0 394 395 for path in metadata: 396 with open(path, 'r') as metadata_file: 397 metadata_list = json.load(metadata_file) 398 399 deps: list[str] = [] 400 tests: list[Test] = [] 401 402 for entry in metadata_list: 403 if entry['type'] == 'self': 404 group_name = canonicalize(entry['name']) 405 elif entry['type'] == 'dep': 406 deps.append(canonicalize(entry['group'])) 407 elif entry['type'] == 'test': 408 test_directory = os.path.join(root, entry['test_directory']) 409 test_binary = find_binary( 410 f'{test_directory}:{entry["test_name"]}' 411 ) 412 413 if test_binary not in all_tests: 414 all_tests[test_binary] = Test( 415 entry['test_name'], test_binary 416 ) 417 418 tests.append(all_tests[test_binary]) 419 420 if deps: 421 group_deps.append((group_name, deps)) 422 423 num_tests += len(tests) 424 test_groups[group_name] = TestGroup(group_name, tests) 425 426 for name, deps in group_deps: 427 test_groups[name].set_deps([test_groups[dep] for dep in deps]) 428 429 _LOG.info('Found %d test groups (%d tests).', len(metadata), num_tests) 430 return test_groups 431 432 433def tests_from_groups( 434 group_names: Sequence[str] | None, root: str 435) -> list[Test]: 436 """Returns unit tests belonging to test groups and their dependencies. 437 438 If args.names is nonempty, only searches groups specified there. 439 Otherwise, finds tests from all known test groups. 440 """ 441 442 _LOG.info('Scanning for tests...') 443 metadata = find_test_metadata(root) 444 test_groups = parse_metadata(metadata, root) 445 446 groups_to_run = group_names if group_names else test_groups.keys() 447 tests_to_run: Set[Test] = set() 448 449 for name in groups_to_run: 450 try: 451 tests_to_run.update(test_groups[name].all_test_dependencies()) 452 except KeyError: 453 _LOG.error('Unknown test group: %s', name) 454 sys.exit(1) 455 456 _LOG.info('Running test groups %s', ', '.join(groups_to_run)) 457 return list(tests_to_run) 458 459 460def tests_from_paths(paths: Sequence[str]) -> list[Test]: 461 """Returns a list of tests from test executable paths.""" 462 463 tests: list[Test] = [] 464 for path in paths: 465 name = os.path.splitext(os.path.basename(path))[0] 466 tests.append(Test(name, path)) 467 return tests 468 469 470def parse_env(env: Sequence[str]) -> dict[str, str]: 471 """Returns a dictionary of environment names and values. 472 473 Args: 474 env: List of strings of the form "key=val". 475 476 Raises: 477 ValueError if `env` is malformed. 478 """ 479 envvars = {} 480 if env: 481 for envvar in env: 482 parts = envvar.split('=') 483 if len(parts) != 2: 484 raise ValueError(f'malformed environment variable: {envvar}') 485 envvars[parts[0]] = parts[1] 486 return envvars 487 488 489async def find_and_run_tests( 490 root: str, 491 runner: str, 492 runner_args: Sequence[str] = (), 493 env: Sequence[str] = (), 494 timeout: float | None = None, 495 group: Sequence[str] | None = None, 496 test: Sequence[str] | None = None, 497 verbose: bool = False, 498) -> int: 499 """Runs some unit tests.""" 500 501 if test: 502 tests = tests_from_paths(test) 503 else: 504 tests = tests_from_groups(group, root) 505 506 envvars = parse_env(env) 507 508 test_runner = TestRunner( 509 runner, runner_args, tests, envvars, timeout, verbose 510 ) 511 await test_runner.run_tests() 512 513 return 0 if test_runner.all_passed() else 1 514 515 516def main() -> int: 517 """Run Pigweed unit tests built using GN.""" 518 519 parser = argparse.ArgumentParser(description=main.__doc__) 520 register_arguments(parser) 521 parser.add_argument( 522 '-v', 523 '--verbose', 524 action='store_true', 525 help='Output additional logs as the script runs', 526 ) 527 528 args_as_dict = dict(vars(parser.parse_args())) 529 return asyncio.run(find_and_run_tests(**args_as_dict)) 530 531 532if __name__ == '__main__': 533 pw_cli.log.install(hide_timestamp=True) 534 sys.exit(main()) 535