#!/usr/bin/env python # # Copyright (C) 2022 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. # """Common utility functions shared by merge_* scripts. Expects items in OPTIONS prepared by merge_target_files.py. """ import fnmatch import logging import os import re import shutil import zipfile import common logger = logging.getLogger(__name__) OPTIONS = common.OPTIONS def ExtractItems(input_zip, output_dir, extract_item_list): """Extracts items in extract_item_list from a zip to a dir.""" # Filter the extract_item_list to remove any items that do not exist in the # zip file. Otherwise, the extraction step will fail. with zipfile.ZipFile(input_zip, allowZip64=True) as input_zipfile: input_namelist = input_zipfile.namelist() filtered_extract_item_list = [] for pattern in extract_item_list: if fnmatch.filter(input_namelist, pattern): filtered_extract_item_list.append(pattern) common.UnzipToDir(input_zip, output_dir, filtered_extract_item_list) def CopyItems(from_dir, to_dir, copy_item_list): """Copies the items in copy_item_list from source to destination directory. copy_item_list may include files and directories. Will copy the matched files and create the matched directories. Args: from_dir: The source directory. to_dir: The destination directory. copy_item_list: Items to be copied. """ item_paths = [] for root, dirs, files in os.walk(from_dir): item_paths.extend( os.path.relpath(path=os.path.join(root, item_name), start=from_dir) for item_name in files + dirs) filtered = set() for pattern in copy_item_list: filtered.update(fnmatch.filter(item_paths, pattern)) for item in filtered: original_path = os.path.join(from_dir, item) copied_path = os.path.join(to_dir, item) copied_parent_path = os.path.dirname(copied_path) if not os.path.exists(copied_parent_path): os.makedirs(copied_parent_path) if os.path.islink(original_path): os.symlink(os.readlink(original_path), copied_path) elif os.path.isdir(original_path): if not os.path.exists(copied_path): os.makedirs(copied_path) else: shutil.copyfile(original_path, copied_path) def GetTargetFilesItems(target_files_zipfile_or_dir): """Gets a list of target files items.""" if zipfile.is_zipfile(target_files_zipfile_or_dir): with zipfile.ZipFile(target_files_zipfile_or_dir, allowZip64=True) as fz: return fz.namelist() elif os.path.isdir(target_files_zipfile_or_dir): item_list = [] for root, dirs, files in os.walk(target_files_zipfile_or_dir): item_list.extend( os.path.relpath(path=os.path.join(root, item), start=target_files_zipfile_or_dir) for item in dirs + files) return item_list else: raise ValueError('Target files should be either zipfile or directory.') def CollectTargetFiles(input_zipfile_or_dir, output_dir, item_list=None): """Extracts input zipfile or copy input directory to output directory. Extracts the input zipfile if `input_zipfile_or_dir` is a zip archive, or copies the items if `input_zipfile_or_dir` is a directory. Args: input_zipfile_or_dir: The input target files, could be either a zipfile to extract or a directory to copy. output_dir: The output directory that the input files are either extracted or copied. item_list: Files to be extracted or copied. Will extract or copy all files if omitted. """ patterns = item_list if item_list else ('*',) if zipfile.is_zipfile(input_zipfile_or_dir): ExtractItems(input_zipfile_or_dir, output_dir, patterns) elif os.path.isdir(input_zipfile_or_dir): CopyItems(input_zipfile_or_dir, output_dir, patterns) else: raise ValueError('Target files should be either zipfile or directory.') def WriteSortedData(data, path): """Writes the sorted contents of either a list or dict to file. This function sorts the contents of the list or dict and then writes the resulting sorted contents to a file specified by path. Args: data: The list or dict to sort and write. path: Path to the file to write the sorted values to. The file at path will be overridden if it exists. """ with open(path, 'w') as output: for entry in sorted(data): out_str = '{}={}\n'.format(entry, data[entry]) if isinstance( data, dict) else '{}\n'.format(entry) output.write(out_str) def ValidateConfigLists(): """Performs validations on the merge config lists. Returns: False if a validation fails, otherwise true. """ has_error = False # Check that partitions only come from one input. framework_partitions = ItemListToPartitionSet(OPTIONS.framework_item_list) vendor_partitions = ItemListToPartitionSet(OPTIONS.vendor_item_list) from_both = framework_partitions.intersection(vendor_partitions) if from_both: logger.error( 'Cannot extract items from the same partition in both the ' 'framework and vendor builds. Please ensure only one merge config ' 'item list (or inferred list) includes each partition: %s' % ','.join(from_both)) has_error = True if any([ key in OPTIONS.framework_misc_info_keys for key in ('dynamic_partition_list', 'super_partition_groups') ]): logger.error('Dynamic partition misc info keys should come from ' 'the vendor instance of META/misc_info.txt.') has_error = True return not has_error # In an item list (framework or vendor), we may see entries that select whole # partitions. Such an entry might look like this 'SYSTEM/*' (e.g., for the # system partition). The following regex matches this and extracts the # partition name. _PARTITION_ITEM_PATTERN = re.compile(r'^([A-Z_]+)/.*$') _IMAGE_PARTITION_PATTERN = re.compile(r'^IMAGES/(.*)\.img$') _PREBUILT_IMAGE_PARTITION_PATTERN = re.compile(r'^PREBUILT_IMAGES/(.*)\.img$') def ItemListToPartitionSet(item_list): """Converts a target files item list to a partition set. The item list contains items that might look like 'SYSTEM/*' or 'VENDOR/*' or 'OTA/android-info.txt'. Items that end in '/*' are assumed to match entire directories where 'SYSTEM' or 'VENDOR' is a directory name that identifies the contents of a partition of the same name. Other items in the list, such as the 'OTA' example contain metadata. This function iterates such a list, returning a set that contains the partition entries. Args: item_list: A list of items in a target files package. Returns: A set of partitions extracted from the list of items. """ partition_set = set() for item in item_list: for pattern in (_PARTITION_ITEM_PATTERN, _IMAGE_PARTITION_PATTERN, _PREBUILT_IMAGE_PARTITION_PATTERN): partition_match = pattern.search(item.strip()) if partition_match: partition = partition_match.group(1).lower() # These directories in target-files are not actual partitions. if partition not in ('meta', 'images', 'prebuilt_images'): partition_set.add(partition) return partition_set # Partitions that are grabbed from the framework partial build by default. _FRAMEWORK_PARTITIONS = { 'system', 'product', 'system_ext', 'system_other', 'root', 'vbmeta_system', 'pvmfw' } def InferItemList(input_namelist, framework): item_set = set() # Some META items are always grabbed from partial builds directly. # Others are combined in merge_meta.py. if framework: item_set.update([ 'META/liblz4.so', 'META/postinstall_config.txt', 'META/zucchini_config.txt', ]) else: # vendor item_set.update([ 'META/kernel_configs.txt', 'META/kernel_version.txt', 'META/otakeys.txt', 'META/pack_radioimages.txt', 'META/releasetools.py', ]) # Grab a set of items for the expected partitions in the partial build. seen_partitions = [] for namelist in input_namelist: if namelist.endswith('/'): continue partition = namelist.split('/')[0].lower() # META items are grabbed above, or merged later. if partition == 'meta': continue if partition in ('images', 'prebuilt_images'): image_partition, extension = os.path.splitext(os.path.basename(namelist)) if image_partition == 'vbmeta': # Always regenerate vbmeta.img since it depends on hash information # from both builds. continue if extension in ('.img', '.map'): # Include image files in IMAGES/* if the partition comes from # the expected set. if (framework and image_partition in _FRAMEWORK_PARTITIONS) or ( not framework and image_partition not in _FRAMEWORK_PARTITIONS): item_set.add(namelist) elif not framework: # Include all miscellaneous non-image files in IMAGES/* from # the vendor build. item_set.add(namelist) continue # Skip already-visited partitions. if partition in seen_partitions: continue seen_partitions.append(partition) if (framework and partition in _FRAMEWORK_PARTITIONS) or ( not framework and partition not in _FRAMEWORK_PARTITIONS): fs_config_prefix = '' if partition == 'system' else '%s_' % partition item_set.update([ '%s/*' % partition.upper(), 'META/%sfilesystem_config.txt' % fs_config_prefix, ]) return sorted(item_set) def InferFrameworkMiscInfoKeys(input_namelist): keys = [ 'ab_update', 'avb_vbmeta_system', 'avb_vbmeta_system_algorithm', 'avb_vbmeta_system_key_path', 'avb_vbmeta_system_rollback_index_location', 'default_system_dev_certificate', ] for partition in _FRAMEWORK_PARTITIONS: for partition_dir in ('%s/' % partition.upper(), 'SYSTEM/%s/' % partition): if partition_dir in input_namelist: fs_type_prefix = '' if partition == 'system' else '%s_' % partition keys.extend([ 'avb_%s_hashtree_enable' % partition, 'avb_%s_add_hashtree_footer_args' % partition, '%s_disable_sparse' % partition, 'building_%s_image' % partition, '%sfs_type' % fs_type_prefix, ]) return sorted(keys)