xref: /aosp_15_r20/external/pigweed/pw_unit_test/py/pw_unit_test/test_runner.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2019 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Runs Pigweed unit tests built using GN."""
15
16from __future__ import annotations
17
18import argparse
19import asyncio
20import base64
21import datetime
22import enum
23import json
24import logging
25import os
26import re
27import subprocess
28import sys
29import time
30
31from pathlib import Path
32from typing import Iterable, Sequence, Set
33
34import requests
35
36import pw_cli.log
37import pw_cli.process
38
39# Global logger for the script.
40_LOG: logging.Logger = logging.getLogger(__name__)
41
42_ANSI_SEQUENCE_REGEX = re.compile(rb'\x1b[^m]*m')
43
44
45def _strip_ansi(bytes_with_sequences: bytes) -> bytes:
46    """Strip out ANSI escape sequences."""
47    return _ANSI_SEQUENCE_REGEX.sub(b'', bytes_with_sequences)
48
49
50def register_arguments(parser: argparse.ArgumentParser) -> None:
51    """Registers command-line arguments."""
52
53    parser.add_argument(
54        '--root',
55        type=str,
56        default='out',
57        help='Path to the root build directory',
58    )
59    parser.add_argument(
60        '-r',
61        '--runner',
62        type=str,
63        required=True,
64        help='Executable which runs a test on the target',
65    )
66    parser.add_argument(
67        '-m', '--timeout', type=float, help='Timeout for test runner in seconds'
68    )
69    parser.add_argument(
70        '-e',
71        '--env',
72        nargs='*',
73        help='Environment variables to set for the test. These should be of the'
74        ' form `var_name=value`.',
75    )
76
77    parser.add_argument(
78        'runner_args', nargs="*", help='Arguments to forward to the test runner'
79    )
80
81    # The runner script can either run binaries directly or groups.
82    group = parser.add_mutually_exclusive_group()
83    group.add_argument(
84        '-g', '--group', action='append', help='Test groups to run'
85    )
86    group.add_argument(
87        '-t', '--test', action='append', help='Test binaries to run'
88    )
89
90
91class TestResult(enum.Enum):
92    """Result of a single unit test run."""
93
94    UNKNOWN = 0
95    SUCCESS = 1
96    FAILURE = 2
97
98
99class Test:
100    """A unit test executable."""
101
102    def __init__(self, name: str, file_path: str) -> None:
103        self.name: str = name
104        self.file_path: str = file_path
105        self.status: TestResult = TestResult.UNKNOWN
106        self.start_time: datetime.datetime
107        self.duration_s: float
108
109    def __repr__(self) -> str:
110        return f'Test({self.name})'
111
112    def __eq__(self, other: object) -> bool:
113        if not isinstance(other, Test):
114            return NotImplemented
115        return self.file_path == other.file_path
116
117    def __hash__(self) -> int:
118        return hash(self.file_path)
119
120
121class TestGroup:
122    """Graph node representing a group of unit tests."""
123
124    def __init__(self, name: str, tests: Iterable[Test]):
125        self._name: str = name
126        self._deps: Iterable['TestGroup'] = []
127        self._tests: Iterable[Test] = tests
128
129    def set_deps(self, deps: Iterable['TestGroup']) -> None:
130        """Updates the dependency list of this group."""
131        self._deps = deps
132
133    def all_test_dependencies(self) -> list[Test]:
134        """Returns a list of all tests in this group and its dependencies."""
135        return list(self._all_test_dependencies(set()))
136
137    def _all_test_dependencies(self, processed_groups: Set[str]) -> Set[Test]:
138        if self._name in processed_groups:
139            return set()
140
141        tests: Set[Test] = set()
142        for dep in self._deps:
143            tests.update(
144                dep._all_test_dependencies(  # pylint: disable=protected-access
145                    processed_groups
146                )
147            )
148
149        tests.update(self._tests)
150        processed_groups.add(self._name)
151
152        return tests
153
154    def __repr__(self) -> str:
155        return f'TestGroup({self._name})'
156
157
158class TestRunner:
159    """Runs unit tests by calling out to a runner script."""
160
161    def __init__(
162        self,
163        executable: str,
164        args: Sequence[str],
165        tests: Iterable[Test],
166        env: dict[str, str] | None = None,
167        timeout: float | None = None,
168        verbose: bool = False,
169    ) -> None:
170        self._executable: str = executable
171        self._args: Sequence[str] = args
172        self._tests: list[Test] = list(tests)
173        self._env: dict[str, str] = env or {}
174        self._timeout = timeout
175        self._result_sink: dict[str, str] | None = None
176        self.verbose = verbose
177
178        # Access go/result-sink, if available.
179        ctx_path = Path(os.environ.get("LUCI_CONTEXT", ''))
180        if not ctx_path.is_file():
181            return
182
183        ctx = json.loads(ctx_path.read_text(encoding='utf-8'))
184        self._result_sink = ctx.get('result_sink', None)
185
186    async def run_tests(self) -> None:
187        """Runs all registered unit tests through the runner script."""
188
189        for idx, test in enumerate(self._tests, 1):
190            total = str(len(self._tests))
191            test_counter = f'Test {idx:{len(total)}}/{total}'
192
193            _LOG.debug('%s: [ RUN] %s', test_counter, test.name)
194
195            # Convert POSIX to native directory seperators as GN produces '/'
196            # but the Windows test runner needs '\\'.
197            command = [
198                str(Path(self._executable)),
199                *self._args,
200                str(Path(test.file_path)),
201            ]
202
203            if self._executable.endswith('.py'):
204                command.insert(0, sys.executable)
205
206            test.start_time = datetime.datetime.now(datetime.timezone.utc)
207            start_time = time.monotonic()
208            try:
209                process = await pw_cli.process.run_async(
210                    *command,
211                    env=self._env,
212                    timeout=self._timeout,
213                    log_output=self.verbose,
214                )
215            except subprocess.CalledProcessError as err:
216                _LOG.error(err)
217                return
218            test.duration_s = time.monotonic() - start_time
219
220            if process.returncode == 0:
221                test.status = TestResult.SUCCESS
222                test_result = 'PASS'
223            else:
224                test.status = TestResult.FAILURE
225                test_result = 'FAIL'
226
227                _LOG.log(
228                    pw_cli.log.LOGLEVEL_STDOUT,
229                    '[Pid: %s]\n%s',
230                    pw_cli.color.colors().bold_white(process.pid),
231                    process.output.decode(errors='ignore').rstrip(),
232                )
233
234                _LOG.info(
235                    '%s: [%s] %s in %.3f s',
236                    test_counter,
237                    test_result,
238                    test.name,
239                    test.duration_s,
240                )
241
242            try:
243                self._maybe_upload_to_resultdb(test, process)
244            except requests.exceptions.HTTPError as err:
245                _LOG.error(err)
246                return
247
248    def all_passed(self) -> bool:
249        """Returns true if all unit tests passed."""
250        return all(test.status is TestResult.SUCCESS for test in self._tests)
251
252    def _maybe_upload_to_resultdb(
253        self, test: Test, process: pw_cli.process.CompletedProcess
254    ):
255        """Uploads test result to ResultDB, if available."""
256        if self._result_sink is None:
257            # ResultDB integration not enabled.
258            return
259
260        test_result = {
261            # The test.name is not suitable as an identifier because it's just
262            # the basename of the test (channel_test). We want the full path,
263            # including the toolchain used.
264            "testId": test.file_path,
265            # ResultDB also supports CRASH and ABORT, but there's currently no
266            # way to distinguish these in pw_unit_test.
267            "status": "PASS" if test.status is TestResult.SUCCESS else "FAIL",
268            # The "expected" field is required. It could be used to report
269            # expected failures, but we don't currently support these in
270            # pw_unit_test.
271            "expected": test.status is TestResult.SUCCESS,
272            # Ensure to format the duration with '%.9fs' to avoid scientific
273            # notation.  If a value is too large or small and formatted with
274            # str() or '%s', python formats the value in scientific notation,
275            # like '1.1e-10', which is an invalid input for
276            # google.protobuf.duration.
277            "duration": "%.9fs" % test.duration_s,
278            "start_time": test.start_time.isoformat(),
279            "testMetadata": {
280                # Use the file path as the test name in the Milo UI. (If this is
281                # left unspecified, the UI will attempt to build a "good enough"
282                # name by truncating the testId. That produces less readable
283                # results.)
284                "name": test.file_path,
285            },
286            "summaryHtml": (
287                '<p><text-artifact '
288                'artifact-id="artifact-content-in-request"></p>'
289            ),
290            "artifacts": {
291                "artifact-content-in-request": {
292                    # Need to decode the bytes back to ASCII or they will not be
293                    # encodable by json.dumps.
294                    #
295                    # TODO: b/248349219 - Instead of stripping the ANSI color
296                    # codes, convert them to HTML.
297                    "contents": base64.b64encode(
298                        _strip_ansi(process.output)
299                    ).decode('ascii'),
300                },
301            },
302        }
303
304        requests.post(
305            url='http://%s/prpc/luci.resultsink.v1.Sink/ReportTestResults'
306            % self._result_sink['address'],
307            headers={
308                'Content-Type': 'application/json',
309                'Accept': 'application/json',
310                'Authorization': 'ResultSink %s'
311                % self._result_sink['auth_token'],
312            },
313            data=json.dumps({'testResults': [test_result]}),
314            timeout=5.0,
315        ).raise_for_status()
316
317
318# Filename extension for unit test metadata files.
319METADATA_EXTENSION = '.testinfo.json'
320
321
322def find_test_metadata(root: str) -> list[str]:
323    """Locates all test metadata files located within a directory tree."""
324
325    metadata: list[str] = []
326    for path, _, files in os.walk(root):
327        for filename in files:
328            if not filename.endswith(METADATA_EXTENSION):
329                continue
330
331            full_path = os.path.join(path, filename)
332            _LOG.debug('Found group metadata at %s', full_path)
333            metadata.append(full_path)
334
335    return metadata
336
337
338# TODO(frolv): This is copied from the Python runner script.
339# It should be extracted into a library and imported instead.
340def find_binary(target: str) -> str:
341    """Tries to find a binary for a gn build target.
342
343    Args:
344        target: Relative filesystem path to the target's output directory and
345            target name, separated by a colon.
346
347    Returns:
348        Full path to the target's binary.
349
350    Raises:
351        FileNotFoundError: No binary found for target.
352    """
353
354    target_path, target_name = target.split(':')
355
356    for extension in ['', '.elf', '.exe']:
357        potential_filename = f'{target_path}/{target_name}{extension}'
358        if os.path.isfile(potential_filename):
359            return potential_filename
360
361    raise FileNotFoundError(
362        f'Could not find output binary for build target {target}'
363    )
364
365
366def parse_metadata(metadata: list[str], root: str) -> dict[str, TestGroup]:
367    """Builds a graph of test group objects from metadata.
368
369    Args:
370        metadata: List of paths to JSON test metadata files.
371        root: Root output directory of the build.
372
373    Returns:
374        Map of group name to TestGroup object. All TestGroup objects are fully
375        populated with the paths to their unit tests and references to their
376        dependencies.
377    """
378
379    def canonicalize(path: str) -> str:
380        """Removes a trailing slash from a GN target's directory.
381
382        '//module:target'  -> //module:target
383        '//module/:target' -> //module:target
384        """
385        index = path.find(':')
386        if index == -1 or path[index - 1] != '/':
387            return path
388        return path[: index - 1] + path[index:]
389
390    group_deps: list[tuple[str, list[str]]] = []
391    all_tests: dict[str, Test] = {}
392    test_groups: dict[str, TestGroup] = {}
393    num_tests = 0
394
395    for path in metadata:
396        with open(path, 'r') as metadata_file:
397            metadata_list = json.load(metadata_file)
398
399        deps: list[str] = []
400        tests: list[Test] = []
401
402        for entry in metadata_list:
403            if entry['type'] == 'self':
404                group_name = canonicalize(entry['name'])
405            elif entry['type'] == 'dep':
406                deps.append(canonicalize(entry['group']))
407            elif entry['type'] == 'test':
408                test_directory = os.path.join(root, entry['test_directory'])
409                test_binary = find_binary(
410                    f'{test_directory}:{entry["test_name"]}'
411                )
412
413                if test_binary not in all_tests:
414                    all_tests[test_binary] = Test(
415                        entry['test_name'], test_binary
416                    )
417
418                tests.append(all_tests[test_binary])
419
420        if deps:
421            group_deps.append((group_name, deps))
422
423        num_tests += len(tests)
424        test_groups[group_name] = TestGroup(group_name, tests)
425
426    for name, deps in group_deps:
427        test_groups[name].set_deps([test_groups[dep] for dep in deps])
428
429    _LOG.info('Found %d test groups (%d tests).', len(metadata), num_tests)
430    return test_groups
431
432
433def tests_from_groups(
434    group_names: Sequence[str] | None, root: str
435) -> list[Test]:
436    """Returns unit tests belonging to test groups and their dependencies.
437
438    If args.names is nonempty, only searches groups specified there.
439    Otherwise, finds tests from all known test groups.
440    """
441
442    _LOG.info('Scanning for tests...')
443    metadata = find_test_metadata(root)
444    test_groups = parse_metadata(metadata, root)
445
446    groups_to_run = group_names if group_names else test_groups.keys()
447    tests_to_run: Set[Test] = set()
448
449    for name in groups_to_run:
450        try:
451            tests_to_run.update(test_groups[name].all_test_dependencies())
452        except KeyError:
453            _LOG.error('Unknown test group: %s', name)
454            sys.exit(1)
455
456    _LOG.info('Running test groups %s', ', '.join(groups_to_run))
457    return list(tests_to_run)
458
459
460def tests_from_paths(paths: Sequence[str]) -> list[Test]:
461    """Returns a list of tests from test executable paths."""
462
463    tests: list[Test] = []
464    for path in paths:
465        name = os.path.splitext(os.path.basename(path))[0]
466        tests.append(Test(name, path))
467    return tests
468
469
470def parse_env(env: Sequence[str]) -> dict[str, str]:
471    """Returns a dictionary of environment names and values.
472
473    Args:
474        env: List of strings of the form "key=val".
475
476    Raises:
477        ValueError if `env` is malformed.
478    """
479    envvars = {}
480    if env:
481        for envvar in env:
482            parts = envvar.split('=')
483            if len(parts) != 2:
484                raise ValueError(f'malformed environment variable: {envvar}')
485            envvars[parts[0]] = parts[1]
486    return envvars
487
488
489async def find_and_run_tests(
490    root: str,
491    runner: str,
492    runner_args: Sequence[str] = (),
493    env: Sequence[str] = (),
494    timeout: float | None = None,
495    group: Sequence[str] | None = None,
496    test: Sequence[str] | None = None,
497    verbose: bool = False,
498) -> int:
499    """Runs some unit tests."""
500
501    if test:
502        tests = tests_from_paths(test)
503    else:
504        tests = tests_from_groups(group, root)
505
506    envvars = parse_env(env)
507
508    test_runner = TestRunner(
509        runner, runner_args, tests, envvars, timeout, verbose
510    )
511    await test_runner.run_tests()
512
513    return 0 if test_runner.all_passed() else 1
514
515
516def main() -> int:
517    """Run Pigweed unit tests built using GN."""
518
519    parser = argparse.ArgumentParser(description=main.__doc__)
520    register_arguments(parser)
521    parser.add_argument(
522        '-v',
523        '--verbose',
524        action='store_true',
525        help='Output additional logs as the script runs',
526    )
527
528    args_as_dict = dict(vars(parser.parse_args()))
529    return asyncio.run(find_and_run_tests(**args_as_dict))
530
531
532if __name__ == '__main__':
533    pw_cli.log.install(hide_timestamp=True)
534    sys.exit(main())
535