xref: /aosp_15_r20/external/perfetto/tools/test_data (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1#!/usr/bin/env python3
2"""
3Script to synchronize (local>remote and viceversa) test data files from/to GCS.
4
5//test/data files are not checked in the codebase because they are large binary
6file and change frequently. Instead we check-in only xxx.sha256 files, which
7contain the SHA-256 of the actual binary file, and sync them from a GCS bucket.
8
9File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 .
10
11Usage:
12./test_data status     # Prints the status of new & modified files.
13./test_data download   # To sync remote>local (used by install-build-deps).
14./test_data upload     # To upload newly created and modified files.
15"""
16
17import argparse
18import logging
19import os
20import sys
21import hashlib
22import subprocess
23
24from multiprocessing.pool import ThreadPool
25from collections import namedtuple, defaultdict
26
27ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
28BUCKET = 'gs://perfetto/test_data'
29SUFFIX = '.sha256'
30
31FS_MATCH = 'matches'
32FS_NEW_FILE = 'needs upload'
33FS_MODIFIED = 'modified'
34FS_MISSING = 'needs download'
35
36FileStat = namedtuple('FileStat',
37                      ['path', 'status', 'actual_digest', 'expected_digest'])
38args = None
39
40
41def relpath(path):
42  return os.path.relpath(path, ROOT_DIR)
43
44
45def download(url, out_file):
46  subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url])
47
48
49def list_files(path, scan_new_files=False):
50  """ List files recursively in path.
51
52  If scan_new_files=False, returns only files with a maching xxx.sha256 tracker.
53  If scan_new_files=True returns all files including untracked ones.
54  """
55  seen = set()
56  for root, _, files in os.walk(path):
57    for fname in files:
58      if fname.endswith('.swp'):
59        continue  # Temporary files left around if CTRL-C-ing while downloading.
60      if fname in ["OWNERS", "README.md"]:
61        continue  # OWNERS or README.md file should not be uploaded.
62      fpath = os.path.join(root, fname)
63      if not os.path.isfile(fpath) or fname.startswith('.'):
64        continue
65      if fpath.endswith(SUFFIX):
66        fpath = fpath[:-len(SUFFIX)]
67      elif not scan_new_files:
68        continue
69      if fpath not in seen:
70        seen.add(fpath)
71        yield fpath
72
73
74def hash_file(fpath):
75  hasher = hashlib.sha256()
76  with open(fpath, 'rb') as f:
77    for chunk in iter(lambda: f.read(32768), b''):
78      hasher.update(chunk)
79  return hasher.hexdigest()
80
81
82def map_concurrently(fn, files):
83  done = 0
84  for fs in ThreadPool(args.jobs).imap_unordered(fn, files):
85    assert (isinstance(fs, FileStat))
86    done += 1
87    if not args.quiet:
88      print(
89          '[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]),
90          end='\r')
91  if not args.quiet:
92    print('')
93
94
95def get_file_status(fpath):
96  sha_file = fpath + SUFFIX
97  sha_exists = os.path.exists(sha_file)
98  file_exists = os.path.exists(fpath)
99  actual_digest = None
100  expected_digest = None
101  if sha_exists:
102    with open(sha_file, 'r') as f:
103      expected_digest = f.readline().strip()
104  if file_exists:
105    actual_digest = hash_file(fpath)
106  if sha_exists and not file_exists:
107    status = FS_MISSING
108  elif not sha_exists and file_exists:
109    status = FS_NEW_FILE
110  elif not sha_exists and not file_exists:
111    raise Exception(fpath)
112  elif expected_digest == actual_digest:
113    status = FS_MATCH
114  else:
115    status = FS_MODIFIED
116  return FileStat(fpath, status, actual_digest, expected_digest)
117
118
119def cmd_upload(dir):
120  all_files = list_files(dir, scan_new_files=True)
121  files_to_upload = []
122  for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
123    if fs.status in (FS_NEW_FILE, FS_MODIFIED):
124      files_to_upload.append(fs)
125  if len(files_to_upload) == 0:
126    if not args.quiet:
127      print('No modified or new files require uploading')
128    return 0
129  if args.dry_run:
130    return 0
131  if not args.quiet:
132    print('\n'.join(relpath(f.path) for f in files_to_upload))
133    print('')
134    print('About to upload %d files' % len(files_to_upload))
135    input('Press a key to continue or CTRL-C to abort')
136
137  def upload_one_file(fs):
138    assert (fs.actual_digest is not None)
139    dst_name = '%s/%s-%s' % (args.bucket, os.path.basename(
140        fs.path), fs.actual_digest)
141    cmd = ['gsutil', '-q', 'cp', '-n', '-a', 'public-read', fs.path, dst_name]
142    logging.debug(' '.join(cmd))
143    subprocess.check_call(cmd)
144    with open(fs.path + SUFFIX + '.swp', 'w') as f:
145      f.write(fs.actual_digest)
146    os.replace(fs.path + SUFFIX + '.swp', fs.path + SUFFIX)
147    return fs
148
149  map_concurrently(upload_one_file, files_to_upload)
150  return 0
151
152
153def cmd_clean(dir):
154  all_files = list_files(dir, scan_new_files=True)
155  files_to_clean = []
156  for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
157    if fs.status in (FS_NEW_FILE, FS_MODIFIED):
158      files_to_clean.append(fs.path)
159  if len(files_to_clean) == 0:
160    if not args.quiet:
161      print('No modified or new files require cleaning')
162    return 0
163  if args.dry_run:
164    return 0
165  if not args.quiet:
166    print('\n'.join(relpath(f) for f in files_to_clean))
167    print('')
168    print('About to remove %d files' % len(files_to_clean))
169    input('Press a key to continue or CTRL-C to abort')
170  list(map(os.remove, files_to_clean))
171  return 0
172
173
174def cmd_download(dir, overwrite_locally_modified=False):
175  files_to_download = []
176  modified = []
177  all_files = list_files(dir, scan_new_files=False)
178  for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
179    if fs.status == FS_MISSING:
180      files_to_download.append(fs)
181    elif fs.status == FS_MODIFIED:
182      modified.append(fs)
183
184  if len(modified) > 0 and not overwrite_locally_modified:
185    print('WARNING: The following files diverged locally and will NOT be ' +
186          'overwritten if you continue')
187    print('\n'.join(relpath(f.path) for f in modified))
188    print('')
189    print('Re run `download --overwrite` to overwrite locally modified files')
190    print('or `upload` to sync them on the GCS bucket')
191    print('')
192    input('Press a key to continue or CTRL-C to abort')
193  elif overwrite_locally_modified:
194    files_to_download += modified
195
196  if len(files_to_download) == 0:
197    if not args.quiet:
198      print('Nothing to do, all files are synced')
199    return 0
200
201  if not args.quiet:
202    print('Downloading %d files in //%s' %
203          (len(files_to_download), relpath(args.dir)))
204  if args.dry_run:
205    print('\n'.join(files_to_download))
206    return
207
208  def download_one_file(fs):
209    assert (fs.expected_digest is not None)
210    uri = '%s/%s-%s' % (args.bucket, os.path.basename(
211        fs.path), fs.expected_digest)
212    uri = uri.replace('gs://', 'https://storage.googleapis.com/')
213    logging.debug(uri)
214    tmp_path = fs.path + '.swp'
215    download(uri, tmp_path)
216    digest = hash_file(tmp_path)
217    if digest != fs.expected_digest:
218      raise Exception('Mismatching digest for %s. expected=%s, actual=%s' %
219                      (uri, fs.expected_digest, digest))
220    os.replace(tmp_path, fs.path)
221    return fs
222
223  map_concurrently(download_one_file, files_to_download)
224  return 0
225
226
227def cmd_status(dir):
228  files = list_files(dir, scan_new_files=True)
229  file_by_status = defaultdict(list)
230  num_files = 0
231  num_out_of_sync = 0
232  for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files):
233    file_by_status[fs.status].append(relpath(fs.path))
234    num_files += 1
235  for status, rpaths in sorted(file_by_status.items()):
236    if status == FS_NEW_FILE and args.ignore_new:
237      continue
238    if status != FS_MATCH:
239      for rpath in rpaths:
240        num_out_of_sync += 1
241        if not args.quiet:
242          print('%-15s: %s' % (status, rpath))
243  if num_out_of_sync == 0:
244    if not args.quiet:
245      print('Scanned %d files in //%s, everything in sync.' %
246            (num_files, relpath(dir)))
247    return 0
248  return 1
249
250
251def main():
252  parser = argparse.ArgumentParser()
253  parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data'))
254  parser.add_argument('--overwrite', action='store_true')
255  parser.add_argument('--bucket', default=BUCKET)
256  parser.add_argument('--jobs', '-j', default=10, type=int)
257  parser.add_argument('--dry-run', '-n', action='store_true')
258  parser.add_argument('--quiet', '-q', action='store_true')
259  parser.add_argument('--verbose', '-v', action='store_true')
260  parser.add_argument('--ignore-new', action='store_true')
261  parser.add_argument('cmd', choices=['status', 'download', 'upload', 'clean'])
262  global args
263  args = parser.parse_args()
264  logging.basicConfig(
265      format='%(asctime)s %(levelname).1s %(message)s',
266      level=logging.DEBUG if args.verbose else logging.INFO,
267      datefmt=r'%H:%M:%S')
268  if args.cmd == 'status':
269    return cmd_status(args.dir)
270  if args.cmd == 'download':
271    return cmd_download(args.dir, overwrite_locally_modified=args.overwrite)
272  if args.cmd == 'upload':
273    return cmd_upload(args.dir)
274  if args.cmd == 'clean':
275    return cmd_clean(args.dir)
276  print('Unknown command: %s' % args.cmd)
277
278
279if __name__ == '__main__':
280  sys.exit(main())
281