1#!/usr/bin/python3 2# SPDX-License-Identifier: GPL-2.0 3# Copyright (C) 2024 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""This utility generates a single lcov tracefile from a gcov tar file.""" 18 19import argparse 20import collections 21import fnmatch 22import glob 23import json 24import logging 25import os 26import pathlib 27import re 28import shutil 29import sys 30import tarfile 31 32 33LCOV = "lcov" 34 35# Relative to the root of the source tree. 36OUTPUT_COV_DIR = os.path.join("out", "coverage") 37 38BUILD_CONFIG_CONSTANTS_PATH = os.path.join("common", "build.config.constants") 39 40PREBUILT_CLANG_DIR = os.path.join("prebuilts", "clang", "host", "linux-x86") 41 42PREBUILT_LLVM_COV_PATH_FORMAT = os.path.join( 43 PREBUILT_CLANG_DIR, "clang-%s", "bin", "llvm-cov" 44) 45 46PREBUILT_STABLE_LLVM_COV_PATH = os.path.join( 47 PREBUILT_CLANG_DIR, "llvm-binutils-stable", "llvm-cov" 48) 49 50EXCLUDED_FILES = [ 51 "*/security/selinux/av_permissions.h", 52 "*/security/selinux/flask.h", 53] 54 55 56def create_llvm_gcov_sh( 57 llvm_cov_filename: str, 58 llvm_gcov_sh_filename: str, 59) -> None: 60 """Create a shell script that is compatible with gcov. 61 62 Args: 63 llvm_cov_filename: The absolute path to llvm-cov. 64 llvm_gcov_sh_filename: The path to the script to be created. 65 """ 66 file_path = pathlib.Path(llvm_gcov_sh_filename) 67 file_path.parent.mkdir(parents=True, exist_ok=True) 68 file_path.write_text(f'#!/bin/bash\nexec {llvm_cov_filename} gcov "$@"') 69 os.chmod(llvm_gcov_sh_filename, 0o755) 70 71 72def generate_lcov_tracefile( 73 gcov_dir: str, 74 kernel_source: str, 75 gcov_filename: str, 76 tracefile_filename: str, 77 included_files: [], 78) -> None: 79 """Call lcov to create tracefile based on gcov data files. 80 81 Args: 82 gcov_dir: Directory that contains the extracted gcov data files as retrieved 83 from debugfs. 84 kernel_source: Directory containing the kernel source same as what was used 85 to build system under test. 86 gcov_filename: The absolute path to gcov or a compatible script. 87 tracefile_filename: The name of tracefile to create. 88 included_files: List of source file pattern to include in tracefile. Can be 89 empty in which case include allo source. 90 """ 91 exclude_args = " ".join([f'--exclude "{f}"' for f in EXCLUDED_FILES]) 92 include_args = ( 93 " ".join([f'--include "{f[0]}"' for f in included_files]) 94 if included_files is not None 95 else "" 96 ) 97 98 logging.info("Running lcov on %s", gcov_dir) 99 lcov_cmd = ( 100 f"{LCOV} -q " 101 "--ignore-errors=source " 102 "--rc branch_coverage=1 " 103 f"-b {kernel_source} " 104 f"-d {gcov_dir} " 105 f"--gcov-tool {gcov_filename} " 106 f"{exclude_args} " 107 f"{include_args} " 108 "--ignore-errors gcov,gcov,unused,unused " 109 "--capture " 110 f"-o {tracefile_filename} " 111 ) 112 os.system(lcov_cmd) 113 114 115def update_symlink_from_mapping(filepath: str, prefix_mappings: {}) -> bool: 116 """Update symbolic link based on prefix mappings. 117 118 It will attempt to update the given symbolic link based on the prefix 119 mappings. For every "from" prefix that matches replace with the new "to" 120 value. If the resulting path doesn't exist, try the next. 121 122 Args: 123 filepath: Path of symbolic link to update. 124 prefix_mappings: A multimap where the key is the "from" prefix to match, and 125 the value is an array of "to" values to attempt to replace with. 126 127 Returns: 128 True or false depending on the whether symbolic link was successfully 129 updated to a new path that exists. 130 """ 131 132 link_target = os.readlink(filepath) 133 for old_prefix, new_prefix_list in prefix_mappings.items(): 134 for new_prefix in new_prefix_list: 135 if link_target.startswith(old_prefix): 136 new_target = os.path.abspath( 137 link_target.replace(old_prefix, new_prefix) 138 ) 139 if not os.path.exists(new_target): 140 continue 141 os.unlink(filepath) # Remove the old symbolic link 142 os.symlink(new_target, filepath) # Create the updated link 143 return True 144 145 return False 146 147 148def correct_symlinks_in_directory(directory: str, prefix_mappings: {}) -> None: 149 """Recursively traverses a directory, updating symbolic links. 150 151 Replaces 'old_prefix' in the link destination with 'new_prefix'. 152 153 Args: 154 directory: The root directory to traverse. 155 prefix_mappings: Dictionary where the keys are the old prefixes and the 156 values are the new prefixes 157 """ 158 159 logging.info("Fixing up symbolic links in %s", directory) 160 161 for root, _, files in os.walk(directory): 162 for filename in files: 163 filepath = os.path.join(root, filename) 164 if os.path.islink(filepath): 165 if not update_symlink_from_mapping(filepath, prefix_mappings): 166 logging.error( 167 "Unable to update link at %s with any prefix mappings: %s", 168 filepath, 169 prefix_mappings, 170 ) 171 sys.exit(-1) 172 173 174def find_most_recent_tarfile(path: str, pattern: str = "*.tar.gz") -> str: 175 """Attempts to find a valid tar file given the location. 176 177 If location is a directory finds the most recent tarfile or if location is a 178 a valid tar file returns, if neither of these return None. 179 180 Args: 181 path (str): The path to either a tarfile or a directory. 182 pattern (str, optional): Glob pattern for matching tarfiles. Defaults to 183 "*.tar.gz". 184 185 Returns: 186 str: The path to the most recent tarfile found, or the original path 187 if it was a valid tarfile. None if no matching tarfiles are found. 188 """ 189 190 if os.path.isfile(path): 191 if tarfile.is_tarfile(path): 192 return path # Path is a valid tarfile 193 return None # Path is a file but not a tar file 194 195 if os.path.isdir(path): 196 results = [] 197 for root, _, files in os.walk(path): 198 for file in files: 199 if fnmatch.fnmatch(file, pattern): 200 full_path = os.path.join(root, file) 201 results.append((full_path, os.path.getmtime(full_path))) 202 203 if results: 204 return max(results, key=lambda item: item[1])[ 205 0 206 ] # Return path of the most recent one 207 else: 208 return None # No tarfiles found in the directory 209 210 return None # Path is neither a tarfile nor a directory 211 212 213def make_absolute(path: str, base_dir: str) -> str: 214 if os.path.isabs(path): 215 return path 216 217 return os.path.join(base_dir, path) 218 219 220def append_slash(path: str) -> str: 221 if path is not None and path[-1] != "/": 222 path += "/" 223 return path 224 225 226def update_multimap_from_json( 227 json_file: str, base_dir: str, result_multimap: collections.defaultdict 228) -> None: 229 """Reads 'to' and 'from' fields from a JSON file and updates a multimap. 230 231 'from' refers to a bazel sandbox directory. 232 'to' refers to the output directory of gcno files. 233 The multimap is implemented as a dictionary of lists allowing multiple 'to' 234 values for each 'from' key. 235 236 Sample input: 237 [ 238 { 239 "from": "/sandbox/1/execroot/_main/out/android-mainline/common", 240 "to": "bazel-out/k8-fastbuild/bin/common/kernel_x86_64/kernel_x86_64_gcno" 241 }, 242 { 243 "from": "/sandbox/2/execroot/_main/out/android-mainline/common", 244 "to": "bazel-out/k8-fastbuild/bin/common-modules/virtual-device/virtual_device_x86_64/virtual_device_x86_64_gcno" 245 } 246 ] 247 248 Args: 249 json_file: The path to the JSON file. 250 base_dir: Used if either of the 'to' or 'from' paths are relative to make 251 them absolute by prepending this base_dir value. 252 result_multimap: A multimap that is updated with every 'to' and 'from' 253 found. 254 255 Returns: 256 The updated dictionary. 257 """ 258 with open(json_file, "r") as file: 259 data = json.load(file) 260 261 for item in data: 262 to_value = append_slash(item.get("to")) 263 from_value = append_slash(item.get("from")) 264 if to_value and from_value: 265 to_value = make_absolute(to_value, base_dir) 266 from_value = make_absolute(from_value, base_dir) 267 result_multimap[from_value].append(to_value) 268 269 270def read_gcno_mapping_files( 271 search_dir_pattern: str, 272 base_dir: str, 273 result_multimap: collections.defaultdict 274) -> None: 275 """Search a directory for gcno_mapping.""" 276 found = False 277 pattern = os.path.join(search_dir_pattern, "gcno_mapping.*.json") 278 for filepath in glob.iglob(pattern, recursive=False): 279 found = True 280 logging.info("Reading %s", filepath) 281 update_multimap_from_json(filepath, base_dir, result_multimap) 282 283 if not found: 284 logging.error("No gcno_mapping in %s", search_dir_pattern) 285 286 287def read_gcno_dir( 288 gcno_dir: str, result_multimap: collections.defaultdict 289) -> None: 290 """Read a directory containing gcno_mapping and gcno files.""" 291 multimap = collections.defaultdict(list) 292 read_gcno_mapping_files(gcno_dir, gcno_dir, multimap) 293 294 to_value = append_slash(os.path.abspath(gcno_dir)) 295 for from_value in multimap: 296 result_multimap[from_value].append(to_value) 297 298 299def get_testname_from_filename(file_path: str) -> str: 300 filename = os.path.basename(file_path) 301 if "_kernel_coverage" in filename: 302 tmp = filename[: filename.find("_kernel_coverage")] 303 testname = tmp[: tmp.rfind("_")] 304 else: 305 testname = filename[: filename.rfind("_")] 306 return testname 307 308 309def unpack_gcov_tar(file_path: str, output_dir: str) -> str: 310 """Unpack the tar file into the specified directory. 311 312 Args: 313 file_path: The path of the tar file to be unpacked. 314 output_dir: The root directory where the unpacked folder will reside. 315 316 Returns: 317 The path of extracted data. 318 """ 319 320 testname = get_testname_from_filename(file_path) 321 logging.info( 322 "Unpacking %s for test %s...", os.path.basename(file_path), testname 323 ) 324 325 test_dest_dir = os.path.join(output_dir, testname) 326 if os.path.exists(test_dest_dir): 327 shutil.rmtree(test_dest_dir) 328 os.makedirs(test_dest_dir) 329 shutil.unpack_archive(file_path, test_dest_dir, "tar") 330 return test_dest_dir 331 332 333def get_parent_path(path: str, levels_up: int) -> str: 334 """Goes up a specified number of levels from a given path. 335 336 Args: 337 path: The path to find desired ancestor. 338 levels_up: The number of levels up to go. 339 340 Returns: 341 The desired ancestor of the given path. 342 """ 343 p = pathlib.Path(path) 344 for _ in range(levels_up): 345 p = p.parent 346 return str(p) 347 348 349def get_kernel_repo_dir() -> str: 350 # Assume this script is in a kernel source tree: 351 # kernel_repo/kernel/tests/tools/<this_script> 352 return get_parent_path(os.path.abspath(__file__), 4) 353 354 355def load_kernel_clang_version(repo_dir: str) -> str: 356 """Load CLANG_VERSION from build.config.constants.""" 357 config_path = os.path.join(repo_dir, BUILD_CONFIG_CONSTANTS_PATH) 358 if not os.path.isfile(config_path): 359 return "" 360 clang_version = "" 361 with open(config_path, "r") as config_file: 362 for line in config_file: 363 match = re.fullmatch(r"\s*CLANG_VERSION=(\S*)\s*", line) 364 if match: 365 clang_version = match.group(1) 366 return clang_version 367 368 369class Config: 370 """The input and output paths of this script.""" 371 372 def __init__(self, repo_dir: str, llvm_cov_path: str, tmp_dir: str): 373 """Each argument can be empty.""" 374 self._repo_dir = os.path.abspath(repo_dir) if repo_dir else None 375 self._llvm_cov_path = ( 376 os.path.abspath(llvm_cov_path) if llvm_cov_path else None 377 ) 378 self._tmp_dir = os.path.abspath(tmp_dir) if tmp_dir else None 379 self._repo_out_dir = None 380 381 @property 382 def repo_dir(self) -> str: 383 if not self._repo_dir: 384 self._repo_dir = get_kernel_repo_dir() 385 return self._repo_dir 386 387 def _get_repo_path(self, rel_path: str) -> str: 388 repo_path = os.path.join(self.repo_dir, rel_path) 389 if not os.path.exists(repo_path): 390 logging.error( 391 "%s does not exist. If this script is not in the source directory," 392 " specify --repo-dir. If you do not have full kernel source," 393 " specify --llvm-cov, --gcno-dir, and --tmp-dir.", 394 repo_path, 395 ) 396 sys.exit(-1) 397 return repo_path 398 399 @property 400 def llvm_cov_path(self) -> str: 401 if not self._llvm_cov_path: 402 # Load the clang version in kernel repo, 403 # or use the stable version in platform repo. 404 clang_version = load_kernel_clang_version(self.repo_dir) 405 self._llvm_cov_path = self._get_repo_path( 406 PREBUILT_LLVM_COV_PATH_FORMAT % clang_version if clang_version else 407 PREBUILT_STABLE_LLVM_COV_PATH 408 ) 409 return self._llvm_cov_path 410 411 @property 412 def repo_out_dir(self) -> str: 413 if not self._repo_out_dir: 414 self._repo_out_dir = self._get_repo_path("out") 415 return self._repo_out_dir 416 417 @property 418 def tmp_dir(self) -> str: 419 if not self._tmp_dir: 420 # Temporary directory does not have to exist. 421 self._tmp_dir = os.path.join(self.repo_dir, OUTPUT_COV_DIR) 422 return self._tmp_dir 423 424 @property 425 def llvm_gcov_sh_path(self) -> str: 426 return os.path.join(self.tmp_dir, "tmp", "llvm-gcov.sh") 427 428 429def main() -> None: 430 arg_parser = argparse.ArgumentParser( 431 description="Generate lcov tracefiles from gcov file dumps" 432 ) 433 434 arg_parser.add_argument( 435 "-t", 436 dest="tar_location", 437 required=True, 438 help=( 439 "Either a path to a gcov tar file or a directory that contains gcov" 440 " tar file(s). The gcov tar file is expected to be created from" 441 " Tradefed. If a directory is used, will search the entire directory" 442 " for files matching *_kernel_coverage*.tar.gz and select the most" 443 " recent one." 444 ), 445 ) 446 arg_parser.add_argument( 447 "-o", 448 dest="out_file", 449 required=False, 450 help="Name of output tracefile generated. Default: cov.info", 451 default="cov.info", 452 ) 453 arg_parser.add_argument( 454 "--include", 455 action="append", 456 nargs=1, 457 required=False, 458 help=( 459 "File pattern of source file(s) to include in generated tracefile." 460 " Multiple patterns can be specified by using multiple --include" 461 " command line switches. If no includes are specified all source is" 462 " included." 463 ), 464 ) 465 arg_parser.add_argument( 466 "--repo-dir", 467 required=False, 468 help="Root directory of kernel source" 469 ) 470 arg_parser.add_argument( 471 "--dist-dir", 472 dest="dist_dirs", 473 action="append", 474 default=[], 475 required=False, 476 help="Dist directory containing gcno mapping files" 477 ) 478 arg_parser.add_argument( 479 "--gcno-dir", 480 dest="gcno_dirs", 481 action="append", 482 default=[], 483 required=False, 484 help="Path to an extracted .gcno.tar.gz" 485 ) 486 arg_parser.add_argument( 487 "--llvm-cov", 488 required=False, 489 help=( 490 "Path to llvm-cov. Default: " 491 + os.path.join("<repo_dir>", PREBUILT_LLVM_COV_PATH_FORMAT % "*") 492 + " or " + os.path.join("<repo_dir>", PREBUILT_STABLE_LLVM_COV_PATH) 493 ) 494 ) 495 arg_parser.add_argument( 496 "--tmp-dir", 497 required=False, 498 help=( 499 "Path to the directory where the temporary files are created." 500 " Default: " + os.path.join("<repo_dir>", OUTPUT_COV_DIR) 501 ) 502 ) 503 arg_parser.add_argument( 504 "--verbose", 505 action="store_true", 506 default=False, 507 help="Enable verbose logging", 508 ) 509 510 args = arg_parser.parse_args() 511 512 if args.verbose: 513 logging.basicConfig(level=logging.DEBUG) 514 else: 515 logging.basicConfig(level=logging.WARNING) 516 517 if shutil.which(LCOV) is None: 518 logging.error( 519 "%s is not found and is required for this script. Please install from:", 520 LCOV, 521 ) 522 logging.critical(" https://github.com/linux-test-project/lcov") 523 sys.exit(-1) 524 525 if args.repo_dir and not os.path.isdir(args.repo_dir): 526 logging.error("%s is not a directory.", args.repo_dir) 527 sys.exit(-1) 528 529 if args.llvm_cov and not os.path.isfile(args.llvm_cov): 530 logging.error("%s is not a file.", args.llvm_cov) 531 sys.exit(-1) 532 533 for gcno_dir in args.gcno_dirs + args.dist_dirs: 534 if not os.path.isdir(gcno_dir): 535 logging.error("%s is not a directory.", gcno_dir) 536 sys.exit(-1) 537 538 config = Config(args.repo_dir, args.llvm_cov, args.tmp_dir) 539 540 gcno_mappings = collections.defaultdict(list) 541 if not args.gcno_dirs and not args.dist_dirs: 542 dist_dir_pattern = os.path.join(config.repo_out_dir, "**", "dist") 543 read_gcno_mapping_files(dist_dir_pattern, config.repo_dir, gcno_mappings) 544 545 for dist_dir in args.dist_dirs: 546 read_gcno_mapping_files(dist_dir, config.repo_dir, gcno_mappings) 547 548 for gcno_dir in args.gcno_dirs: 549 read_gcno_dir(gcno_dir, gcno_mappings) 550 551 if not gcno_mappings: 552 # read_gcno_mapping_files prints the error messages 553 sys.exit(-1) 554 555 tar_file = find_most_recent_tarfile( 556 args.tar_location, pattern="*kernel_coverage_*.tar.gz" 557 ) 558 if tar_file is None: 559 logging.error("Unable to find a gcov tar under %s", args.tar_location) 560 sys.exit(-1) 561 562 gcov_dir = unpack_gcov_tar(tar_file, config.tmp_dir) 563 correct_symlinks_in_directory(gcov_dir, gcno_mappings) 564 565 create_llvm_gcov_sh( 566 config.llvm_cov_path, 567 config.llvm_gcov_sh_path, 568 ) 569 570 generate_lcov_tracefile( 571 gcov_dir, 572 config.repo_dir, 573 config.llvm_gcov_sh_path, 574 args.out_file, 575 args.include, 576 ) 577 578 579if __name__ == "__main__": 580 main() 581