1#!/usr/bin/env python3 2# 3# Copyright 2022, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Integration tests for the Atest Bazel mode feature.""" 18 19# pylint: disable=invalid-name 20# pylint: disable=missing-class-docstring 21# pylint: disable=missing-function-docstring 22 23import json 24import os 25from pathlib import Path 26import re 27import shutil 28import subprocess 29import tempfile 30from typing import Any, Dict 31import unittest 32 33 34class ResultCompareTest(unittest.TestCase): 35 36 def setUp(self): 37 self.src_root_path = Path(os.environ['ANDROID_BUILD_TOP']) 38 self.out_dir_path = Path(tempfile.mkdtemp()) 39 self.test_env = self.setup_test_env() 40 41 def tearDown(self): 42 shutil.rmtree(self.out_dir_path) 43 44 def test_standard_mode_and_bazel_mode_result_equal(self): 45 standard_mode_result = self.get_test_result( 46 shell_cmd='atest -c -m --host --host-unit-test-only' 47 ) 48 49 bazel_mode_result = self.get_test_result( 50 shell_cmd=( 51 'atest -c --bazel-mode --host --host-unit-test-only ' 52 '--bazel-arg=--test_timeout=300' 53 ), 54 is_bazel_mode=True, 55 ) 56 57 self.assert_test_result_equal(standard_mode_result, bazel_mode_result) 58 59 def setup_test_env(self) -> Dict[str, Any]: 60 test_env = { 61 'PATH': os.environ['PATH'], 62 'HOME': os.environ['HOME'], 63 'OUT_DIR': str(self.out_dir_path), 64 } 65 return test_env 66 67 def get_test_result( 68 self, 69 shell_cmd: str, 70 is_bazel_mode: bool = False, 71 ) -> Dict[str, str]: 72 result_file_name = 'test_result' 73 if is_bazel_mode: 74 shell_cmd = ( 75 f'{shell_cmd} --bazel-arg=--build_event_json_file={result_file_name}' 76 ) 77 78 completed_process = self.run_shell_command(shell_cmd) 79 result_file_path = self.get_result_file_path( 80 completed_process, result_file_name, is_bazel_mode 81 ) 82 83 if is_bazel_mode: 84 return parse_bazel_result(result_file_path) 85 return parse_standard_result(result_file_path) 86 87 def get_result_file_path( 88 self, 89 completed_process: subprocess.CompletedProcess, 90 result_file_name: str, 91 is_bazel_mode: bool = False, 92 ) -> Path: 93 if is_bazel_mode: 94 return self.out_dir_path.joinpath( 95 'atest_bazel_workspace', result_file_name 96 ) 97 98 result_file_path = None 99 log_dir_prefix = 'Atest results and logs directory: ' 100 for line in completed_process.stdout.decode().splitlines(): 101 if line.startswith(log_dir_prefix): 102 result_file_path = Path(line[len(log_dir_prefix) :]) / result_file_name 103 break 104 105 if not result_file_path: 106 raise Exception('Could not find test result filepath') 107 108 return result_file_path 109 110 def run_shell_command( 111 self, 112 shell_command: str, 113 ) -> subprocess.CompletedProcess: 114 return subprocess.run( 115 '. build/envsetup.sh && ' 116 'lunch aosp_cf_x86_64_pc-userdebug && ' 117 f'{shell_command}', 118 env=self.test_env, 119 cwd=self.src_root_path, 120 shell=True, 121 check=False, 122 stderr=subprocess.STDOUT, 123 stdout=subprocess.PIPE, 124 ) 125 126 def assert_test_result_equal(self, result1, result2): 127 self.assertEqual(set(result1.keys()), set(result2.keys())) 128 129 print( 130 '{0:100} {1:20} {2}'.format( 131 'Test', 'Atest Standard Mode', 'Atest Bazel Mode' 132 ) 133 ) 134 count = 0 135 for k, v in result1.items(): 136 if v != result2[k]: 137 count += 1 138 print('{0:100} {1:20} {2}'.format(k, v, result2[k])) 139 print( 140 f'Total Number of Host Unit Test: {len(result1)}. {count} tests ' 141 'have different results.' 142 ) 143 144 self.assertEqual(count, 0) 145 146 147def parse_standard_result(result_file: Path) -> Dict[str, str]: 148 result = {} 149 with result_file.open('r') as f: 150 json_result = json.loads(f.read()) 151 for k, v in json_result['test_runner']['AtestTradefedTestRunner'].items(): 152 name = k.split()[-1] 153 if name in result: 154 raise Exception(f'Duplicated Test Target: `{name}`') 155 156 # Test passed when there are no failed test cases and no errors. 157 result[name] = ( 158 'PASSED' 159 if v['summary']['FAILED'] == 0 and not v.get('ERROR') 160 else 'FAILED' 161 ) 162 return result 163 164 165def parse_bazel_result(result_file: Path) -> Dict[str, str]: 166 result = {} 167 with result_file.open('r') as f: 168 content = f.read() 169 events = content.splitlines() 170 171 for e in events: 172 json_event = json.loads(e) 173 if 'testSummary' in json_event['id']: 174 name = ( 175 json_event['id']['testSummary']['label'] 176 .split(':')[-1] 177 .removesuffix('_host') 178 ) 179 result[name] = json_event['testSummary']['overallStatus'] 180 return result 181 182 183if __name__ == '__main__': 184 unittest.main(verbosity=2) 185