1#!/usr/bin/env python3 2""" 3Script to synchronize (local>remote and viceversa) test data files from/to GCS. 4 5//test/data files are not checked in the codebase because they are large binary 6file and change frequently. Instead we check-in only xxx.sha256 files, which 7contain the SHA-256 of the actual binary file, and sync them from a GCS bucket. 8 9File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 . 10 11Usage: 12./test_data status # Prints the status of new & modified files. 13./test_data download # To sync remote>local (used by install-build-deps). 14./test_data upload # To upload newly created and modified files. 15""" 16 17import argparse 18import logging 19import os 20import sys 21import hashlib 22import subprocess 23 24from multiprocessing.pool import ThreadPool 25from collections import namedtuple, defaultdict 26 27ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 28BUCKET = 'gs://perfetto/test_data' 29SUFFIX = '.sha256' 30 31FS_MATCH = 'matches' 32FS_NEW_FILE = 'needs upload' 33FS_MODIFIED = 'modified' 34FS_MISSING = 'needs download' 35 36FileStat = namedtuple('FileStat', 37 ['path', 'status', 'actual_digest', 'expected_digest']) 38args = None 39 40 41def relpath(path): 42 return os.path.relpath(path, ROOT_DIR) 43 44 45def download(url, out_file): 46 subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url]) 47 48 49def list_files(path, scan_new_files=False): 50 """ List files recursively in path. 51 52 If scan_new_files=False, returns only files with a maching xxx.sha256 tracker. 53 If scan_new_files=True returns all files including untracked ones. 54 """ 55 seen = set() 56 for root, _, files in os.walk(path): 57 for fname in files: 58 if fname.endswith('.swp'): 59 continue # Temporary files left around if CTRL-C-ing while downloading. 60 if fname in ["OWNERS", "README.md"]: 61 continue # OWNERS or README.md file should not be uploaded. 62 fpath = os.path.join(root, fname) 63 if not os.path.isfile(fpath) or fname.startswith('.'): 64 continue 65 if fpath.endswith(SUFFIX): 66 fpath = fpath[:-len(SUFFIX)] 67 elif not scan_new_files: 68 continue 69 if fpath not in seen: 70 seen.add(fpath) 71 yield fpath 72 73 74def hash_file(fpath): 75 hasher = hashlib.sha256() 76 with open(fpath, 'rb') as f: 77 for chunk in iter(lambda: f.read(32768), b''): 78 hasher.update(chunk) 79 return hasher.hexdigest() 80 81 82def map_concurrently(fn, files): 83 done = 0 84 for fs in ThreadPool(args.jobs).imap_unordered(fn, files): 85 assert (isinstance(fs, FileStat)) 86 done += 1 87 if not args.quiet: 88 print( 89 '[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]), 90 end='\r') 91 if not args.quiet: 92 print('') 93 94 95def get_file_status(fpath): 96 sha_file = fpath + SUFFIX 97 sha_exists = os.path.exists(sha_file) 98 file_exists = os.path.exists(fpath) 99 actual_digest = None 100 expected_digest = None 101 if sha_exists: 102 with open(sha_file, 'r') as f: 103 expected_digest = f.readline().strip() 104 if file_exists: 105 actual_digest = hash_file(fpath) 106 if sha_exists and not file_exists: 107 status = FS_MISSING 108 elif not sha_exists and file_exists: 109 status = FS_NEW_FILE 110 elif not sha_exists and not file_exists: 111 raise Exception(fpath) 112 elif expected_digest == actual_digest: 113 status = FS_MATCH 114 else: 115 status = FS_MODIFIED 116 return FileStat(fpath, status, actual_digest, expected_digest) 117 118 119def cmd_upload(dir): 120 all_files = list_files(dir, scan_new_files=True) 121 files_to_upload = [] 122 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): 123 if fs.status in (FS_NEW_FILE, FS_MODIFIED): 124 files_to_upload.append(fs) 125 if len(files_to_upload) == 0: 126 if not args.quiet: 127 print('No modified or new files require uploading') 128 return 0 129 if args.dry_run: 130 return 0 131 if not args.quiet: 132 print('\n'.join(relpath(f.path) for f in files_to_upload)) 133 print('') 134 print('About to upload %d files' % len(files_to_upload)) 135 input('Press a key to continue or CTRL-C to abort') 136 137 def upload_one_file(fs): 138 assert (fs.actual_digest is not None) 139 dst_name = '%s/%s-%s' % (args.bucket, os.path.basename( 140 fs.path), fs.actual_digest) 141 cmd = ['gsutil', '-q', 'cp', '-n', '-a', 'public-read', fs.path, dst_name] 142 logging.debug(' '.join(cmd)) 143 subprocess.check_call(cmd) 144 with open(fs.path + SUFFIX + '.swp', 'w') as f: 145 f.write(fs.actual_digest) 146 os.replace(fs.path + SUFFIX + '.swp', fs.path + SUFFIX) 147 return fs 148 149 map_concurrently(upload_one_file, files_to_upload) 150 return 0 151 152 153def cmd_clean(dir): 154 all_files = list_files(dir, scan_new_files=True) 155 files_to_clean = [] 156 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): 157 if fs.status in (FS_NEW_FILE, FS_MODIFIED): 158 files_to_clean.append(fs.path) 159 if len(files_to_clean) == 0: 160 if not args.quiet: 161 print('No modified or new files require cleaning') 162 return 0 163 if args.dry_run: 164 return 0 165 if not args.quiet: 166 print('\n'.join(relpath(f) for f in files_to_clean)) 167 print('') 168 print('About to remove %d files' % len(files_to_clean)) 169 input('Press a key to continue or CTRL-C to abort') 170 list(map(os.remove, files_to_clean)) 171 return 0 172 173 174def cmd_download(dir, overwrite_locally_modified=False): 175 files_to_download = [] 176 modified = [] 177 all_files = list_files(dir, scan_new_files=False) 178 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): 179 if fs.status == FS_MISSING: 180 files_to_download.append(fs) 181 elif fs.status == FS_MODIFIED: 182 modified.append(fs) 183 184 if len(modified) > 0 and not overwrite_locally_modified: 185 print('WARNING: The following files diverged locally and will NOT be ' + 186 'overwritten if you continue') 187 print('\n'.join(relpath(f.path) for f in modified)) 188 print('') 189 print('Re run `download --overwrite` to overwrite locally modified files') 190 print('or `upload` to sync them on the GCS bucket') 191 print('') 192 input('Press a key to continue or CTRL-C to abort') 193 elif overwrite_locally_modified: 194 files_to_download += modified 195 196 if len(files_to_download) == 0: 197 if not args.quiet: 198 print('Nothing to do, all files are synced') 199 return 0 200 201 if not args.quiet: 202 print('Downloading %d files in //%s' % 203 (len(files_to_download), relpath(args.dir))) 204 if args.dry_run: 205 print('\n'.join(files_to_download)) 206 return 207 208 def download_one_file(fs): 209 assert (fs.expected_digest is not None) 210 uri = '%s/%s-%s' % (args.bucket, os.path.basename( 211 fs.path), fs.expected_digest) 212 uri = uri.replace('gs://', 'https://storage.googleapis.com/') 213 logging.debug(uri) 214 tmp_path = fs.path + '.swp' 215 download(uri, tmp_path) 216 digest = hash_file(tmp_path) 217 if digest != fs.expected_digest: 218 raise Exception('Mismatching digest for %s. expected=%s, actual=%s' % 219 (uri, fs.expected_digest, digest)) 220 os.replace(tmp_path, fs.path) 221 return fs 222 223 map_concurrently(download_one_file, files_to_download) 224 return 0 225 226 227def cmd_status(dir): 228 files = list_files(dir, scan_new_files=True) 229 file_by_status = defaultdict(list) 230 num_files = 0 231 num_out_of_sync = 0 232 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files): 233 file_by_status[fs.status].append(relpath(fs.path)) 234 num_files += 1 235 for status, rpaths in sorted(file_by_status.items()): 236 if status == FS_NEW_FILE and args.ignore_new: 237 continue 238 if status != FS_MATCH: 239 for rpath in rpaths: 240 num_out_of_sync += 1 241 if not args.quiet: 242 print('%-15s: %s' % (status, rpath)) 243 if num_out_of_sync == 0: 244 if not args.quiet: 245 print('Scanned %d files in //%s, everything in sync.' % 246 (num_files, relpath(dir))) 247 return 0 248 return 1 249 250 251def main(): 252 parser = argparse.ArgumentParser() 253 parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data')) 254 parser.add_argument('--overwrite', action='store_true') 255 parser.add_argument('--bucket', default=BUCKET) 256 parser.add_argument('--jobs', '-j', default=10, type=int) 257 parser.add_argument('--dry-run', '-n', action='store_true') 258 parser.add_argument('--quiet', '-q', action='store_true') 259 parser.add_argument('--verbose', '-v', action='store_true') 260 parser.add_argument('--ignore-new', action='store_true') 261 parser.add_argument('cmd', choices=['status', 'download', 'upload', 'clean']) 262 global args 263 args = parser.parse_args() 264 logging.basicConfig( 265 format='%(asctime)s %(levelname).1s %(message)s', 266 level=logging.DEBUG if args.verbose else logging.INFO, 267 datefmt=r'%H:%M:%S') 268 if args.cmd == 'status': 269 return cmd_status(args.dir) 270 if args.cmd == 'download': 271 return cmd_download(args.dir, overwrite_locally_modified=args.overwrite) 272 if args.cmd == 'upload': 273 return cmd_upload(args.dir) 274 if args.cmd == 'clean': 275 return cmd_clean(args.dir) 276 print('Unknown command: %s' % args.cmd) 277 278 279if __name__ == '__main__': 280 sys.exit(main()) 281