xref: /aosp_15_r20/system/apex/apexer/apexer_test.py (revision 33f3758387333dbd2962d7edbd98681940d895da)
1#!/usr/bin/env python
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""Unit tests for apexer."""
18
19import hashlib
20import json
21import logging
22import os
23import shutil
24import stat
25import subprocess
26import tempfile
27import unittest
28from importlib import resources
29from zipfile import ZipFile
30
31from apex_manifest import ValidateApexManifest
32from apex_manifest import ParseApexManifest
33
34logger = logging.getLogger(__name__)
35
36TEST_APEX = "com.android.example.apex"
37TEST_APEX_LEGACY = "com.android.example-legacy.apex"
38TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex"
39TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex"
40
41TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem")
42TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem")
43TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8")
44TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey")
45TEST_MANIFEST_JSON = os.path.join("testdata", "manifest.json")
46
47def run(args, verbose=None, **kwargs):
48    """Creates and returns a subprocess.Popen object.
49
50    Args:
51      args: The command represented as a list of strings.
52      verbose: Whether the commands should be shown. Default to the global
53          verbosity if unspecified.
54      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
55          stdin, etc. stdout and stderr will default to subprocess.PIPE and
56          subprocess.STDOUT respectively unless caller specifies any of them.
57          universal_newlines will default to True, as most of the users in
58          releasetools expect string output.
59
60    Returns:
61      A subprocess.Popen object.
62    """
63    if 'stdout' not in kwargs and 'stderr' not in kwargs:
64        kwargs['stdout'] = subprocess.PIPE
65        kwargs['stderr'] = subprocess.STDOUT
66    if 'universal_newlines' not in kwargs:
67        kwargs['universal_newlines'] = True
68    # Don't log any if caller explicitly says so.
69    if DEBUG_TEST:
70        print("\nRunning: \n%s\n" % " ".join(args))
71    if verbose:
72        logger.info("  Running: \"%s\"", " ".join(args))
73    return subprocess.Popen(args, **kwargs)
74
75
76def run_host_command(args, verbose=None, **kwargs):
77    host_build_top = os.environ.get("ANDROID_BUILD_TOP")
78    if host_build_top:
79        host_command_dir = os.path.join(host_build_top, "out/host/linux-x86/bin")
80        args[0] = os.path.join(host_command_dir, args[0])
81    return run_and_check_output(args, verbose, **kwargs)
82
83
84def run_and_check_output(args, verbose=None, **kwargs):
85    """Runs the given command and returns the output.
86
87    Args:
88      args: The command represented as a list of strings.
89      verbose: Whether the commands should be shown. Default to the global
90          verbosity if unspecified.
91      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
92          stdin, etc. stdout and stderr will default to subprocess.PIPE and
93          subprocess.STDOUT respectively unless caller specifies any of them.
94
95    Returns:
96      The output string.
97
98    Raises:
99      ExternalError: On non-zero exit from the command.
100    """
101    proc = run(args, verbose=verbose, **kwargs)
102    output, _ = proc.communicate()
103    if output is None:
104        output = ""
105    # Don't log any if caller explicitly says so.
106    if verbose:
107        logger.info("%s", output.rstrip())
108    if proc.returncode != 0:
109        raise RuntimeError(
110            "Failed to run command '{}' (exit code {}):\n{}".format(
111                args, proc.returncode, output))
112    return output
113
114
115def get_sha1sum(file_path):
116    h = hashlib.sha256()
117
118    with open(file_path, 'rb') as file:
119        while True:
120            # Reading is buffered, so we can read smaller chunks.
121            chunk = file.read(h.block_size)
122            if not chunk:
123                break
124            h.update(chunk)
125
126    return h.hexdigest()
127
128
129def round_up(size, unit):
130    assert unit & (unit - 1) == 0
131    return (size + unit - 1) & (~(unit - 1))
132
133# In order to debug test failures, set DEBUG_TEST to True and run the test from
134# local workstation bypassing atest, e.g.:
135# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test
136#
137# the test will print out the command used, and the temporary files used by the
138# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with
139# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are
140# different.
141# A simple script to analyze the differences:
142#
143# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex
144# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex
145#
146# cd ~/tmp/
147# rm -rf input output
148# mkdir input output
149# unzip ${FILE_INPUT} -d input/
150# unzip ${FILE_OUTPUT} -d output/
151#
152# diff -r input/ output/
153#
154# For analyzing binary diffs I had mild success using the vbindiff utility.
155DEBUG_TEST = False
156
157
158class ApexerRebuildTest(unittest.TestCase):
159    def setUp(self):
160        self._to_cleanup = []
161        self._get_host_tools()
162
163    def tearDown(self):
164        if not DEBUG_TEST:
165            for i in self._to_cleanup:
166                if os.path.isdir(i):
167                    shutil.rmtree(i, ignore_errors=True)
168                else:
169                    os.remove(i)
170            del self._to_cleanup[:]
171        else:
172            print(self._to_cleanup)
173
174    def _get_host_tools(self):
175        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_")
176        self._to_cleanup.append(dir_name)
177        with resources.files("apexer_test").joinpath("apexer_test_host_tools.zip").open('rb') as f:
178            with ZipFile(f, 'r') as zip_obj:
179                zip_obj.extractall(path=dir_name)
180
181        files = {}
182        for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid",
183                  "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static",
184                  "signapk.jar", "android.jar", "make_erofs", "fsck.erofs", "conv_apex_manifest"]:
185            file_path = os.path.join(dir_name, "bin", i)
186            if os.path.exists(file_path):
187                os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR)
188                files[i] = file_path
189            else:
190                files[i] = i
191        self.host_tools = files
192        self.host_tools_path = os.path.join(dir_name, "bin")
193
194        path = self.host_tools_path
195        if "PATH" in os.environ:
196            path += ":" + os.environ["PATH"]
197        os.environ["PATH"] = path
198
199        ld_library_path = os.path.join(dir_name, "lib64")
200        if "LD_LIBRARY_PATH" in os.environ:
201            ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"]
202        if "ANDROID_HOST_OUT" in os.environ:
203            ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
204        os.environ["LD_LIBRARY_PATH"] = ld_library_path
205
206    def _extract_resource(self, resource_name):
207        with (
208            resources.files("apexer_test").joinpath(resource_name).open('rb') as f,
209            tempfile.NamedTemporaryFile(prefix=resource_name.replace('/', '_'), delete=False) as f2,
210        ):
211            self._to_cleanup.append(f2.name)
212            shutil.copyfileobj(f, f2)
213            return f2.name
214
215    def _get_container_files(self, apex_file_path):
216        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_")
217        self._to_cleanup.append(dir_name)
218        with ZipFile(apex_file_path, 'r') as zip_obj:
219            zip_obj.extractall(path=dir_name)
220        files = {}
221        for i in ["apex_manifest.json", "apex_manifest.pb",
222                  "apex_build_info.pb", "assets",
223                  "apex_payload.img", "apex_payload.zip"]:
224            file_path = os.path.join(dir_name, i)
225            if os.path.exists(file_path):
226                files[i] = file_path
227        self.assertIn("apex_manifest.pb", files)
228        self.assertIn("apex_build_info.pb", files)
229
230        image_file = None
231        if "apex_payload.img" in files:
232            image_file = files["apex_payload.img"]
233        elif "apex_payload.zip" in files:
234            image_file = files["apex_payload.zip"]
235        self.assertIsNotNone(image_file)
236        files["apex_payload"] = image_file
237
238        return files
239
240    def _extract_payload_from_img(self, img_file_path):
241        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
242        self._to_cleanup.append(dir_name)
243        cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path]
244        run_host_command(cmd)
245
246        # Remove payload files added by apexer and e2fs tools.
247        for i in ["apex_manifest.json", "apex_manifest.pb"]:
248            if os.path.exists(os.path.join(dir_name, i)):
249                os.remove(os.path.join(dir_name, i))
250        if os.path.isdir(os.path.join(dir_name, "lost+found")):
251            shutil.rmtree(os.path.join(dir_name, "lost+found"))
252        return dir_name
253
254    def _extract_payload(self, apex_file_path):
255        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
256        self._to_cleanup.append(dir_name)
257        cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"],
258               "--fsckerofs_path", self.host_tools["fsck.erofs"], "extract",
259               apex_file_path, dir_name]
260        run_host_command(cmd)
261
262        # Remove payload files added by apexer and e2fs tools.
263        for i in ["apex_manifest.json", "apex_manifest.pb"]:
264            if os.path.exists(os.path.join(dir_name, i)):
265                os.remove(os.path.join(dir_name, i))
266        if os.path.isdir(os.path.join(dir_name, "lost+found")):
267            shutil.rmtree(os.path.join(dir_name, "lost+found"))
268        return dir_name
269
270    def _run_apexer(self, container_files, payload_dir, args=[]):
271        unsigned_payload_only = False
272        payload_only = False
273        if "--unsigned_payload_only" in args:
274            unsigned_payload_only = True
275        if unsigned_payload_only or "--payload_only" in args:
276            payload_only = True
277
278        os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path +
279            ":out/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin")
280        cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"]
281        if DEBUG_TEST:
282            cmd.append('-v')
283        cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]])
284        cmd.extend(["--android_jar_path", self.host_tools["android.jar"]])
285        cmd.extend(["--manifest", container_files["apex_manifest.pb"]])
286        if "apex_manifest.json" in container_files:
287            cmd.extend(["--manifest_json", container_files["apex_manifest.json"]])
288        cmd.extend(["--build_info", container_files["apex_build_info.pb"]])
289        if not payload_only and "assets" in container_files:
290            cmd.extend(["--assets_dir", container_files["assets"]])
291        if not unsigned_payload_only:
292            cmd.extend(["--key", self._extract_resource(TEST_PRIVATE_KEY)])
293            cmd.extend(["--pubkey", self._extract_resource(TEST_AVB_PUBLIC_KEY)])
294        cmd.extend(args)
295
296        # Decide on output file name
297        apex_suffix = ".apex.unsigned"
298        if payload_only:
299            apex_suffix = ".payload"
300        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix)
301        os.close(fd)
302        self._to_cleanup.append(fn)
303        cmd.extend([payload_dir, fn])
304
305        run_host_command(cmd)
306        return fn
307
308    def _get_java_toolchain(self):
309        java_toolchain = "java"
310        if os.path.isfile("prebuilts/jdk/jdk21/linux-x86/bin/java"):
311            java_toolchain = "prebuilts/jdk/jdk21/linux-x86/bin/java"
312        elif os.path.isfile("/jdk/jdk21/linux-x86/bin/java"):
313            java_toolchain = "/jdk/jdk21/linux-x86/bin/java"
314        elif "ANDROID_JAVA_TOOLCHAIN" in os.environ:
315            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java")
316        elif "ANDROID_JAVA_HOME" in os.environ:
317            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java")
318        elif "JAVA_HOME" in os.environ:
319            java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java")
320
321        java_dep_lib = os.environ["LD_LIBRARY_PATH"]
322        if "ANDROID_HOST_OUT" in os.environ:
323            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
324        if "ANDROID_BUILD_TOP" in os.environ:
325            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"],
326                "out/host/linux-x86/lib64")
327
328        return [java_toolchain, java_dep_lib]
329
330    def _sign_apk_container(self, unsigned_apex):
331        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex")
332        os.close(fd)
333        self._to_cleanup.append(fn)
334        java_toolchain, java_dep_lib = self._get_java_toolchain()
335        cmd = [
336            java_toolchain,
337            "-Djava.library.path=" + java_dep_lib,
338            "-jar", self.host_tools['signapk.jar'],
339            "-a", "4096", "--align-file-size",
340            self._extract_resource(TEST_X509_KEY),
341            self._extract_resource(TEST_PK8_KEY),
342            unsigned_apex, fn]
343        run_and_check_output(cmd)
344        return fn
345
346    def _sign_payload(self, container_files, unsigned_payload):
347        fd, signed_payload = \
348            tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload")
349        os.close(fd)
350        self._to_cleanup.append(signed_payload)
351        shutil.copyfile(unsigned_payload, signed_payload)
352
353        cmd = ['avbtool']
354        cmd.append('add_hashtree_footer')
355        cmd.append('--do_not_generate_fec')
356        cmd.extend(['--algorithm', 'SHA256_RSA4096'])
357        cmd.extend(['--hash_algorithm', 'sha256'])
358        cmd.extend(['--key', self._extract_resource(TEST_PRIVATE_KEY)])
359        manifest_apex = ParseApexManifest(container_files["apex_manifest.pb"])
360        ValidateApexManifest(manifest_apex)
361        cmd.extend(['--prop', 'apex.key:' + manifest_apex.name])
362        # Set up the salt based on manifest content which includes name
363        # and version
364        salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest()
365        cmd.extend(['--salt', salt])
366        cmd.extend(['--image', signed_payload])
367        run_and_check_output(cmd)
368
369        return signed_payload
370
371    def _verify_payload(self, payload):
372        """Verifies that the payload is properly signed by avbtool"""
373        cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"]
374        run_and_check_output(cmd)
375
376    def _run_build_test(self, apex_name):
377        apex_file_path = self._extract_resource(apex_name + ".apex")
378        if DEBUG_TEST:
379            fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex")
380            os.close(fd)
381            shutil.copyfile(apex_file_path, fn)
382            self._to_cleanup.append(fn)
383        container_files = self._get_container_files(apex_file_path)
384        payload_dir = self._extract_payload(apex_file_path)
385        repack_apex_file_path = self._run_apexer(container_files, payload_dir)
386        resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path)
387        self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path))
388
389    def test_simple_apex(self):
390        self._run_build_test(TEST_APEX)
391
392    def test_legacy_apex(self):
393        self._run_build_test(TEST_APEX_LEGACY)
394
395    def test_output_payload_only(self):
396        """Assert that payload-only output from apexer is same as the payload we get by unzipping
397        apex.
398        """
399        apex_file_path = self._extract_resource(TEST_APEX + ".apex")
400        container_files = self._get_container_files(apex_file_path)
401        payload_dir = self._extract_payload(apex_file_path)
402        payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"])
403        self._verify_payload(payload_only_file_path)
404        self.assertEqual(get_sha1sum(payload_only_file_path),
405                         get_sha1sum(container_files["apex_payload"]))
406
407    def test_output_unsigned_payload_only(self):
408        """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is
409        same as the payload we get by unzipping apex.
410        """
411        apex_file_path = self._extract_resource(TEST_APEX + ".apex")
412        container_files = self._get_container_files(apex_file_path)
413        payload_dir = self._extract_payload(apex_file_path)
414        unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir,
415                                                  ["--unsigned_payload_only"])
416        with self.assertRaises(RuntimeError) as error:
417            self._verify_payload(unsigned_payload_only_file_path)
418        self.assertIn("Given image does not look like a vbmeta image", str(error.exception))
419        signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path)
420        self.assertEqual(get_sha1sum(signed_payload),
421                         get_sha1sum(container_files["apex_payload"]))
422
423    def test_apex_with_logging_parent(self):
424      self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT)
425
426    def test_apex_with_overridden_package_name(self):
427      self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME)
428
429    def test_conv_apex_manifest(self):
430        # .pb generation from json
431        manifest_json_path = self._extract_resource(TEST_MANIFEST_JSON)
432
433        fd, fn = tempfile.mkstemp(prefix=self._testMethodName + "_manifest_", suffix=".pb")
434        os.close(fd)
435        self._to_cleanup.append(fn)
436        cmd = [
437            "conv_apex_manifest",
438            "proto",
439            manifest_json_path,
440            "-o", fn]
441        run_and_check_output(cmd)
442
443        with open(manifest_json_path) as fd_json:
444            manifest_json = json.load(fd_json)
445        manifest_apex = ParseApexManifest(fn)
446        ValidateApexManifest(manifest_apex)
447
448        self.assertEqual(manifest_apex.name, manifest_json["name"])
449        self.assertEqual(manifest_apex.version, manifest_json["version"])
450
451        # setprop check on already generated .pb
452        next_version = 20
453        cmd = [
454            "conv_apex_manifest",
455            "setprop",
456            "version", str(next_version),
457            fn]
458        run_and_check_output(cmd)
459
460        manifest_apex = ParseApexManifest(fn)
461        ValidateApexManifest(manifest_apex)
462
463        self.assertEqual(manifest_apex.version, next_version)
464
465
466
467if __name__ == '__main__':
468    unittest.main(verbosity=2)
469