1# Copyright 2023 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Helper functions for dealing with .zip files.""" 5 6import os 7import pathlib 8import posixpath 9import stat 10import time 11import zipfile 12 13_FIXED_ZIP_HEADER_LEN = 30 14 15 16def _set_alignment(zip_obj, zip_info, alignment): 17 """Sets a ZipInfo's extra field such that the file will be aligned. 18 19 Args: 20 zip_obj: The ZipFile object that is being written. 21 zip_info: The ZipInfo object about to be written. 22 alignment: The amount of alignment (e.g. 4, or 4*1024). 23 """ 24 header_size = _FIXED_ZIP_HEADER_LEN + len(zip_info.filename) 25 pos = zip_obj.fp.tell() + header_size 26 padding_needed = (alignment - (pos % alignment)) % alignment 27 28 # Python writes |extra| to both the local file header and the central 29 # directory's file header. Android's zipalign tool writes only to the 30 # local file header, so there is more overhead in using Python to align. 31 zip_info.extra = b'\0' * padding_needed 32 33 34def _hermetic_date_time(timestamp=None): 35 if not timestamp: 36 return (2001, 1, 1, 0, 0, 0) 37 utc_time = time.gmtime(timestamp) 38 return (utc_time.tm_year, utc_time.tm_mon, utc_time.tm_mday, utc_time.tm_hour, 39 utc_time.tm_min, utc_time.tm_sec) 40 41 42def add_to_zip_hermetic(zip_file, 43 zip_path, 44 *, 45 src_path=None, 46 data=None, 47 compress=None, 48 alignment=None, 49 timestamp=None): 50 """Adds a file to the given ZipFile with a hard-coded modified time. 51 52 Args: 53 zip_file: ZipFile instance to add the file to. 54 zip_path: Destination path within the zip file (or ZipInfo instance). 55 src_path: Path of the source file. Mutually exclusive with |data|. 56 data: File data as a string. 57 compress: Whether to enable compression. Default is taken from ZipFile 58 constructor. 59 alignment: If set, align the data of the entry to this many bytes. 60 timestamp: The last modification date and time for the archive member. 61 """ 62 assert (src_path is None) != (data is None), ( 63 '|src_path| and |data| are mutually exclusive.') 64 if isinstance(zip_path, zipfile.ZipInfo): 65 zipinfo = zip_path 66 zip_path = zipinfo.filename 67 else: 68 zipinfo = zipfile.ZipInfo(filename=zip_path) 69 zipinfo.external_attr = 0o644 << 16 70 71 zipinfo.date_time = _hermetic_date_time(timestamp) 72 73 if alignment: 74 _set_alignment(zip_file, zipinfo, alignment) 75 76 # Filenames can contain backslashes, but it is more likely that we've 77 # forgotten to use forward slashes as a directory separator. 78 assert '\\' not in zip_path, 'zip_path should not contain \\: ' + zip_path 79 assert not posixpath.isabs(zip_path), 'Absolute zip path: ' + zip_path 80 assert not zip_path.startswith('..'), 'Should not start with ..: ' + zip_path 81 assert posixpath.normpath(zip_path) == zip_path, ( 82 f'Non-canonical zip_path: {zip_path} vs: {posixpath.normpath(zip_path)}') 83 assert zip_path not in zip_file.namelist(), ( 84 'Tried to add a duplicate zip entry: ' + zip_path) 85 86 if src_path and os.path.islink(src_path): 87 zipinfo.external_attr |= stat.S_IFLNK << 16 # mark as a symlink 88 zip_file.writestr(zipinfo, os.readlink(src_path)) 89 return 90 91 # Maintain the executable bit. 92 if src_path: 93 st = os.stat(src_path) 94 for mode in (stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH): 95 if st.st_mode & mode: 96 zipinfo.external_attr |= mode << 16 97 98 if src_path: 99 with open(src_path, 'rb') as f: 100 data = f.read() 101 102 # zipfile will deflate even when it makes the file bigger. To avoid 103 # growing files, disable compression at an arbitrary cut off point. 104 if len(data) < 16: 105 compress = False 106 107 # None converts to ZIP_STORED, when passed explicitly rather than the 108 # default passed to the ZipFile constructor. 109 compress_type = zip_file.compression 110 if compress is not None: 111 compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED 112 zip_file.writestr(zipinfo, data, compress_type) 113 114 115def add_files_to_zip(inputs, 116 output, 117 *, 118 base_dir=None, 119 compress=None, 120 zip_prefix_path=None, 121 timestamp=None): 122 """Creates a zip file from a list of files. 123 124 Args: 125 inputs: A list of paths to zip, or a list of (zip_path, fs_path) tuples. 126 output: Path, fileobj, or ZipFile instance to add files to. 127 base_dir: Prefix to strip from inputs. 128 compress: Whether to compress 129 zip_prefix_path: Path prepended to file path in zip file. 130 timestamp: Unix timestamp to use for files in the archive. 131 """ 132 if base_dir is None: 133 base_dir = '.' 134 input_tuples = [] 135 for tup in inputs: 136 if isinstance(tup, str): 137 src_path = tup 138 zip_path = os.path.relpath(src_path, base_dir) 139 # Zip files always use / as path separator. 140 if os.path.sep != posixpath.sep: 141 zip_path = str(pathlib.Path(zip_path).as_posix()) 142 tup = (zip_path, src_path) 143 input_tuples.append(tup) 144 145 # Sort by zip path to ensure stable zip ordering. 146 input_tuples.sort(key=lambda tup: tup[0]) 147 148 out_zip = output 149 if not isinstance(output, zipfile.ZipFile): 150 out_zip = zipfile.ZipFile(output, 'w') 151 152 try: 153 for zip_path, fs_path in input_tuples: 154 if zip_prefix_path: 155 zip_path = posixpath.join(zip_prefix_path, zip_path) 156 add_to_zip_hermetic(out_zip, 157 zip_path, 158 src_path=fs_path, 159 compress=compress, 160 timestamp=timestamp) 161 finally: 162 if output is not out_zip: 163 out_zip.close() 164 165 166def zip_directory(output, base_dir, **kwargs): 167 """Zips all files in the given directory.""" 168 inputs = [] 169 for root, _, files in os.walk(base_dir): 170 for f in files: 171 inputs.append(os.path.join(root, f)) 172 173 add_files_to_zip(inputs, output, base_dir=base_dir, **kwargs) 174 175 176def merge_zips(output, input_zips, path_transform=None, compress=None): 177 """Combines all files from |input_zips| into |output|. 178 179 Args: 180 output: Path, fileobj, or ZipFile instance to add files to. 181 input_zips: Iterable of paths to zip files to merge. 182 path_transform: Called for each entry path. Returns a new path, or None to 183 skip the file. 184 compress: Overrides compression setting from origin zip entries. 185 """ 186 assert not isinstance(input_zips, str) # Easy mistake to make. 187 if isinstance(output, zipfile.ZipFile): 188 out_zip = output 189 out_filename = output.filename 190 else: 191 assert isinstance(output, str), 'Was: ' + repr(output) 192 out_zip = zipfile.ZipFile(output, 'w') 193 out_filename = output 194 195 # Include paths in the existing zip here to avoid adding duplicate files. 196 crc_by_name = {i.filename: (out_filename, i.CRC) for i in out_zip.infolist()} 197 198 try: 199 for in_file in input_zips: 200 with zipfile.ZipFile(in_file, 'r') as in_zip: 201 for info in in_zip.infolist(): 202 # Ignore directories. 203 if info.filename[-1] == '/': 204 continue 205 if path_transform: 206 dst_name = path_transform(info.filename) 207 if dst_name is None: 208 continue 209 else: 210 dst_name = info.filename 211 212 data = in_zip.read(info) 213 214 # If there's a duplicate file, ensure contents is the same and skip 215 # adding it multiple times. 216 if dst_name in crc_by_name: 217 orig_filename, orig_crc = crc_by_name[dst_name] 218 new_crc = zipfile.crc32(data) 219 if new_crc == orig_crc: 220 continue 221 msg = f"""File appeared in multiple inputs with differing contents. 222File: {dst_name} 223Input1: {orig_filename} 224Input2: {in_file}""" 225 raise Exception(msg) 226 227 if compress is not None: 228 compress_entry = compress 229 else: 230 compress_entry = info.compress_type != zipfile.ZIP_STORED 231 add_to_zip_hermetic(out_zip, 232 dst_name, 233 data=data, 234 compress=compress_entry) 235 crc_by_name[dst_name] = (in_file, out_zip.getinfo(dst_name).CRC) 236 finally: 237 if output is not out_zip: 238 out_zip.close() 239