1#!/usr/bin/env python 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""Unit tests for apexer.""" 18 19import hashlib 20import json 21import logging 22import os 23import shutil 24import stat 25import subprocess 26import tempfile 27import unittest 28from importlib import resources 29from zipfile import ZipFile 30 31from apex_manifest import ValidateApexManifest 32from apex_manifest import ParseApexManifest 33 34logger = logging.getLogger(__name__) 35 36TEST_APEX = "com.android.example.apex" 37TEST_APEX_LEGACY = "com.android.example-legacy.apex" 38TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex" 39TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex" 40 41TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem") 42TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem") 43TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8") 44TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey") 45TEST_MANIFEST_JSON = os.path.join("testdata", "manifest.json") 46 47def run(args, verbose=None, **kwargs): 48 """Creates and returns a subprocess.Popen object. 49 50 Args: 51 args: The command represented as a list of strings. 52 verbose: Whether the commands should be shown. Default to the global 53 verbosity if unspecified. 54 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 55 stdin, etc. stdout and stderr will default to subprocess.PIPE and 56 subprocess.STDOUT respectively unless caller specifies any of them. 57 universal_newlines will default to True, as most of the users in 58 releasetools expect string output. 59 60 Returns: 61 A subprocess.Popen object. 62 """ 63 if 'stdout' not in kwargs and 'stderr' not in kwargs: 64 kwargs['stdout'] = subprocess.PIPE 65 kwargs['stderr'] = subprocess.STDOUT 66 if 'universal_newlines' not in kwargs: 67 kwargs['universal_newlines'] = True 68 # Don't log any if caller explicitly says so. 69 if DEBUG_TEST: 70 print("\nRunning: \n%s\n" % " ".join(args)) 71 if verbose: 72 logger.info(" Running: \"%s\"", " ".join(args)) 73 return subprocess.Popen(args, **kwargs) 74 75 76def run_host_command(args, verbose=None, **kwargs): 77 host_build_top = os.environ.get("ANDROID_BUILD_TOP") 78 if host_build_top: 79 host_command_dir = os.path.join(host_build_top, "out/host/linux-x86/bin") 80 args[0] = os.path.join(host_command_dir, args[0]) 81 return run_and_check_output(args, verbose, **kwargs) 82 83 84def run_and_check_output(args, verbose=None, **kwargs): 85 """Runs the given command and returns the output. 86 87 Args: 88 args: The command represented as a list of strings. 89 verbose: Whether the commands should be shown. Default to the global 90 verbosity if unspecified. 91 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 92 stdin, etc. stdout and stderr will default to subprocess.PIPE and 93 subprocess.STDOUT respectively unless caller specifies any of them. 94 95 Returns: 96 The output string. 97 98 Raises: 99 ExternalError: On non-zero exit from the command. 100 """ 101 proc = run(args, verbose=verbose, **kwargs) 102 output, _ = proc.communicate() 103 if output is None: 104 output = "" 105 # Don't log any if caller explicitly says so. 106 if verbose: 107 logger.info("%s", output.rstrip()) 108 if proc.returncode != 0: 109 raise RuntimeError( 110 "Failed to run command '{}' (exit code {}):\n{}".format( 111 args, proc.returncode, output)) 112 return output 113 114 115def get_sha1sum(file_path): 116 h = hashlib.sha256() 117 118 with open(file_path, 'rb') as file: 119 while True: 120 # Reading is buffered, so we can read smaller chunks. 121 chunk = file.read(h.block_size) 122 if not chunk: 123 break 124 h.update(chunk) 125 126 return h.hexdigest() 127 128 129def round_up(size, unit): 130 assert unit & (unit - 1) == 0 131 return (size + unit - 1) & (~(unit - 1)) 132 133# In order to debug test failures, set DEBUG_TEST to True and run the test from 134# local workstation bypassing atest, e.g.: 135# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test 136# 137# the test will print out the command used, and the temporary files used by the 138# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with 139# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are 140# different. 141# A simple script to analyze the differences: 142# 143# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex 144# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex 145# 146# cd ~/tmp/ 147# rm -rf input output 148# mkdir input output 149# unzip ${FILE_INPUT} -d input/ 150# unzip ${FILE_OUTPUT} -d output/ 151# 152# diff -r input/ output/ 153# 154# For analyzing binary diffs I had mild success using the vbindiff utility. 155DEBUG_TEST = False 156 157 158class ApexerRebuildTest(unittest.TestCase): 159 def setUp(self): 160 self._to_cleanup = [] 161 self._get_host_tools() 162 163 def tearDown(self): 164 if not DEBUG_TEST: 165 for i in self._to_cleanup: 166 if os.path.isdir(i): 167 shutil.rmtree(i, ignore_errors=True) 168 else: 169 os.remove(i) 170 del self._to_cleanup[:] 171 else: 172 print(self._to_cleanup) 173 174 def _get_host_tools(self): 175 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_") 176 self._to_cleanup.append(dir_name) 177 with resources.files("apexer_test").joinpath("apexer_test_host_tools.zip").open('rb') as f: 178 with ZipFile(f, 'r') as zip_obj: 179 zip_obj.extractall(path=dir_name) 180 181 files = {} 182 for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid", 183 "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static", 184 "signapk.jar", "android.jar", "make_erofs", "fsck.erofs", "conv_apex_manifest"]: 185 file_path = os.path.join(dir_name, "bin", i) 186 if os.path.exists(file_path): 187 os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR) 188 files[i] = file_path 189 else: 190 files[i] = i 191 self.host_tools = files 192 self.host_tools_path = os.path.join(dir_name, "bin") 193 194 path = self.host_tools_path 195 if "PATH" in os.environ: 196 path += ":" + os.environ["PATH"] 197 os.environ["PATH"] = path 198 199 ld_library_path = os.path.join(dir_name, "lib64") 200 if "LD_LIBRARY_PATH" in os.environ: 201 ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"] 202 if "ANDROID_HOST_OUT" in os.environ: 203 ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") 204 os.environ["LD_LIBRARY_PATH"] = ld_library_path 205 206 def _extract_resource(self, resource_name): 207 with ( 208 resources.files("apexer_test").joinpath(resource_name).open('rb') as f, 209 tempfile.NamedTemporaryFile(prefix=resource_name.replace('/', '_'), delete=False) as f2, 210 ): 211 self._to_cleanup.append(f2.name) 212 shutil.copyfileobj(f, f2) 213 return f2.name 214 215 def _get_container_files(self, apex_file_path): 216 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_") 217 self._to_cleanup.append(dir_name) 218 with ZipFile(apex_file_path, 'r') as zip_obj: 219 zip_obj.extractall(path=dir_name) 220 files = {} 221 for i in ["apex_manifest.json", "apex_manifest.pb", 222 "apex_build_info.pb", "assets", 223 "apex_payload.img", "apex_payload.zip"]: 224 file_path = os.path.join(dir_name, i) 225 if os.path.exists(file_path): 226 files[i] = file_path 227 self.assertIn("apex_manifest.pb", files) 228 self.assertIn("apex_build_info.pb", files) 229 230 image_file = None 231 if "apex_payload.img" in files: 232 image_file = files["apex_payload.img"] 233 elif "apex_payload.zip" in files: 234 image_file = files["apex_payload.zip"] 235 self.assertIsNotNone(image_file) 236 files["apex_payload"] = image_file 237 238 return files 239 240 def _extract_payload_from_img(self, img_file_path): 241 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 242 self._to_cleanup.append(dir_name) 243 cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path] 244 run_host_command(cmd) 245 246 # Remove payload files added by apexer and e2fs tools. 247 for i in ["apex_manifest.json", "apex_manifest.pb"]: 248 if os.path.exists(os.path.join(dir_name, i)): 249 os.remove(os.path.join(dir_name, i)) 250 if os.path.isdir(os.path.join(dir_name, "lost+found")): 251 shutil.rmtree(os.path.join(dir_name, "lost+found")) 252 return dir_name 253 254 def _extract_payload(self, apex_file_path): 255 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_") 256 self._to_cleanup.append(dir_name) 257 cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"], 258 "--fsckerofs_path", self.host_tools["fsck.erofs"], "extract", 259 apex_file_path, dir_name] 260 run_host_command(cmd) 261 262 # Remove payload files added by apexer and e2fs tools. 263 for i in ["apex_manifest.json", "apex_manifest.pb"]: 264 if os.path.exists(os.path.join(dir_name, i)): 265 os.remove(os.path.join(dir_name, i)) 266 if os.path.isdir(os.path.join(dir_name, "lost+found")): 267 shutil.rmtree(os.path.join(dir_name, "lost+found")) 268 return dir_name 269 270 def _run_apexer(self, container_files, payload_dir, args=[]): 271 unsigned_payload_only = False 272 payload_only = False 273 if "--unsigned_payload_only" in args: 274 unsigned_payload_only = True 275 if unsigned_payload_only or "--payload_only" in args: 276 payload_only = True 277 278 os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path + 279 ":out/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin") 280 cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"] 281 if DEBUG_TEST: 282 cmd.append('-v') 283 cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]]) 284 cmd.extend(["--android_jar_path", self.host_tools["android.jar"]]) 285 cmd.extend(["--manifest", container_files["apex_manifest.pb"]]) 286 if "apex_manifest.json" in container_files: 287 cmd.extend(["--manifest_json", container_files["apex_manifest.json"]]) 288 cmd.extend(["--build_info", container_files["apex_build_info.pb"]]) 289 if not payload_only and "assets" in container_files: 290 cmd.extend(["--assets_dir", container_files["assets"]]) 291 if not unsigned_payload_only: 292 cmd.extend(["--key", self._extract_resource(TEST_PRIVATE_KEY)]) 293 cmd.extend(["--pubkey", self._extract_resource(TEST_AVB_PUBLIC_KEY)]) 294 cmd.extend(args) 295 296 # Decide on output file name 297 apex_suffix = ".apex.unsigned" 298 if payload_only: 299 apex_suffix = ".payload" 300 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix) 301 os.close(fd) 302 self._to_cleanup.append(fn) 303 cmd.extend([payload_dir, fn]) 304 305 run_host_command(cmd) 306 return fn 307 308 def _get_java_toolchain(self): 309 java_toolchain = "java" 310 if os.path.isfile("prebuilts/jdk/jdk21/linux-x86/bin/java"): 311 java_toolchain = "prebuilts/jdk/jdk21/linux-x86/bin/java" 312 elif os.path.isfile("/jdk/jdk21/linux-x86/bin/java"): 313 java_toolchain = "/jdk/jdk21/linux-x86/bin/java" 314 elif "ANDROID_JAVA_TOOLCHAIN" in os.environ: 315 java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java") 316 elif "ANDROID_JAVA_HOME" in os.environ: 317 java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java") 318 elif "JAVA_HOME" in os.environ: 319 java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java") 320 321 java_dep_lib = os.environ["LD_LIBRARY_PATH"] 322 if "ANDROID_HOST_OUT" in os.environ: 323 java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64") 324 if "ANDROID_BUILD_TOP" in os.environ: 325 java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"], 326 "out/host/linux-x86/lib64") 327 328 return [java_toolchain, java_dep_lib] 329 330 def _sign_apk_container(self, unsigned_apex): 331 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex") 332 os.close(fd) 333 self._to_cleanup.append(fn) 334 java_toolchain, java_dep_lib = self._get_java_toolchain() 335 cmd = [ 336 java_toolchain, 337 "-Djava.library.path=" + java_dep_lib, 338 "-jar", self.host_tools['signapk.jar'], 339 "-a", "4096", "--align-file-size", 340 self._extract_resource(TEST_X509_KEY), 341 self._extract_resource(TEST_PK8_KEY), 342 unsigned_apex, fn] 343 run_and_check_output(cmd) 344 return fn 345 346 def _sign_payload(self, container_files, unsigned_payload): 347 fd, signed_payload = \ 348 tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload") 349 os.close(fd) 350 self._to_cleanup.append(signed_payload) 351 shutil.copyfile(unsigned_payload, signed_payload) 352 353 cmd = ['avbtool'] 354 cmd.append('add_hashtree_footer') 355 cmd.append('--do_not_generate_fec') 356 cmd.extend(['--algorithm', 'SHA256_RSA4096']) 357 cmd.extend(['--hash_algorithm', 'sha256']) 358 cmd.extend(['--key', self._extract_resource(TEST_PRIVATE_KEY)]) 359 manifest_apex = ParseApexManifest(container_files["apex_manifest.pb"]) 360 ValidateApexManifest(manifest_apex) 361 cmd.extend(['--prop', 'apex.key:' + manifest_apex.name]) 362 # Set up the salt based on manifest content which includes name 363 # and version 364 salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest() 365 cmd.extend(['--salt', salt]) 366 cmd.extend(['--image', signed_payload]) 367 run_and_check_output(cmd) 368 369 return signed_payload 370 371 def _verify_payload(self, payload): 372 """Verifies that the payload is properly signed by avbtool""" 373 cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"] 374 run_and_check_output(cmd) 375 376 def _run_build_test(self, apex_name): 377 apex_file_path = self._extract_resource(apex_name + ".apex") 378 if DEBUG_TEST: 379 fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex") 380 os.close(fd) 381 shutil.copyfile(apex_file_path, fn) 382 self._to_cleanup.append(fn) 383 container_files = self._get_container_files(apex_file_path) 384 payload_dir = self._extract_payload(apex_file_path) 385 repack_apex_file_path = self._run_apexer(container_files, payload_dir) 386 resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path) 387 self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path)) 388 389 def test_simple_apex(self): 390 self._run_build_test(TEST_APEX) 391 392 def test_legacy_apex(self): 393 self._run_build_test(TEST_APEX_LEGACY) 394 395 def test_output_payload_only(self): 396 """Assert that payload-only output from apexer is same as the payload we get by unzipping 397 apex. 398 """ 399 apex_file_path = self._extract_resource(TEST_APEX + ".apex") 400 container_files = self._get_container_files(apex_file_path) 401 payload_dir = self._extract_payload(apex_file_path) 402 payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"]) 403 self._verify_payload(payload_only_file_path) 404 self.assertEqual(get_sha1sum(payload_only_file_path), 405 get_sha1sum(container_files["apex_payload"])) 406 407 def test_output_unsigned_payload_only(self): 408 """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is 409 same as the payload we get by unzipping apex. 410 """ 411 apex_file_path = self._extract_resource(TEST_APEX + ".apex") 412 container_files = self._get_container_files(apex_file_path) 413 payload_dir = self._extract_payload(apex_file_path) 414 unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir, 415 ["--unsigned_payload_only"]) 416 with self.assertRaises(RuntimeError) as error: 417 self._verify_payload(unsigned_payload_only_file_path) 418 self.assertIn("Given image does not look like a vbmeta image", str(error.exception)) 419 signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path) 420 self.assertEqual(get_sha1sum(signed_payload), 421 get_sha1sum(container_files["apex_payload"])) 422 423 def test_apex_with_logging_parent(self): 424 self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT) 425 426 def test_apex_with_overridden_package_name(self): 427 self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME) 428 429 def test_conv_apex_manifest(self): 430 # .pb generation from json 431 manifest_json_path = self._extract_resource(TEST_MANIFEST_JSON) 432 433 fd, fn = tempfile.mkstemp(prefix=self._testMethodName + "_manifest_", suffix=".pb") 434 os.close(fd) 435 self._to_cleanup.append(fn) 436 cmd = [ 437 "conv_apex_manifest", 438 "proto", 439 manifest_json_path, 440 "-o", fn] 441 run_and_check_output(cmd) 442 443 with open(manifest_json_path) as fd_json: 444 manifest_json = json.load(fd_json) 445 manifest_apex = ParseApexManifest(fn) 446 ValidateApexManifest(manifest_apex) 447 448 self.assertEqual(manifest_apex.name, manifest_json["name"]) 449 self.assertEqual(manifest_apex.version, manifest_json["version"]) 450 451 # setprop check on already generated .pb 452 next_version = 20 453 cmd = [ 454 "conv_apex_manifest", 455 "setprop", 456 "version", str(next_version), 457 fn] 458 run_and_check_output(cmd) 459 460 manifest_apex = ParseApexManifest(fn) 461 ValidateApexManifest(manifest_apex) 462 463 self.assertEqual(manifest_apex.version, next_version) 464 465 466 467if __name__ == '__main__': 468 unittest.main(verbosity=2) 469