1*2abb3134SXin Li#!/usr/bin/python 2*2abb3134SXin Li"""Summarize the results of many RAPPOR analysis runs. 3*2abb3134SXin Li 4*2abb3134SXin LiTakes a list of STATUS.txt files on stdin, and reads the corresponding spec.txt 5*2abb3134SXin Liand log.txt files. Writes a CSV to stdout. Row key is (metric, date). 6*2abb3134SXin Li""" 7*2abb3134SXin Li 8*2abb3134SXin Liimport collections 9*2abb3134SXin Liimport csv 10*2abb3134SXin Liimport json 11*2abb3134SXin Liimport os 12*2abb3134SXin Liimport re 13*2abb3134SXin Liimport sys 14*2abb3134SXin Li 15*2abb3134SXin Li 16*2abb3134SXin Li# Parse bash 'time' output: 17*2abb3134SXin Li# real 0m11.578s 18*2abb3134SXin Li 19*2abb3134SXin Li# TODO: Parse the time from metrics.json instead. 20*2abb3134SXin LiTIMING_RE = re.compile( 21*2abb3134SXin Li r'real \s+ (\d+) m ([\d.]+) s', re.VERBOSE) 22*2abb3134SXin Li 23*2abb3134SXin Li# TODO: Could have decode-dist and decode-assoc output the PID? 24*2abb3134SXin LiPID_RE = re.compile( 25*2abb3134SXin Li r'write_pid.py: PID (\d+)') # not VERBOSE, spaces are literal 26*2abb3134SXin Li 27*2abb3134SXin Li 28*2abb3134SXin Lidef ParseMemCsv(f): 29*2abb3134SXin Li """Compute summary stats for memory. 30*2abb3134SXin Li 31*2abb3134SXin Li vm5_peak_kib -> max(vm_peak_kib) # over 5 second intervals. Since it uses 32*2abb3134SXin Li the kernel, it's accurate except for takes that spike in their last 4 33*2abb3134SXin Li seconds. 34*2abb3134SXin Li 35*2abb3134SXin Li vm5_mean_kib -> mean(vm_size_kib) # over 5 second intervals 36*2abb3134SXin Li """ 37*2abb3134SXin Li peak_by_pid = collections.defaultdict(list) 38*2abb3134SXin Li size_by_pid = collections.defaultdict(list) 39*2abb3134SXin Li 40*2abb3134SXin Li # Parse columns we care about, by PID 41*2abb3134SXin Li c = csv.reader(f) 42*2abb3134SXin Li for i, row in enumerate(c): 43*2abb3134SXin Li if i == 0: 44*2abb3134SXin Li continue # skip header 45*2abb3134SXin Li # looks like timestamp, pid, then (rss, peak, size) 46*2abb3134SXin Li _, pid, _, peak, size = row 47*2abb3134SXin Li if peak != '': 48*2abb3134SXin Li peak_by_pid[pid].append(int(peak)) 49*2abb3134SXin Li if size != '': 50*2abb3134SXin Li size_by_pid[pid].append(int(size)) 51*2abb3134SXin Li 52*2abb3134SXin Li mem_by_pid = {} 53*2abb3134SXin Li 54*2abb3134SXin Li # Now compute summaries 55*2abb3134SXin Li pids = peak_by_pid.keys() 56*2abb3134SXin Li for pid in pids: 57*2abb3134SXin Li peaks = peak_by_pid[pid] 58*2abb3134SXin Li vm5_peak_kib = max(peaks) 59*2abb3134SXin Li 60*2abb3134SXin Li sizes = size_by_pid[pid] 61*2abb3134SXin Li vm5_mean_kib = sum(sizes) / len(sizes) 62*2abb3134SXin Li 63*2abb3134SXin Li mem_by_pid[pid] = (vm5_peak_kib, vm5_mean_kib) 64*2abb3134SXin Li 65*2abb3134SXin Li return mem_by_pid 66*2abb3134SXin Li 67*2abb3134SXin Li 68*2abb3134SXin Lidef CheckJobId(job_id, parts): 69*2abb3134SXin Li """Sanity check for date or smoke test.""" 70*2abb3134SXin Li if not job_id.startswith('201') and not job_id.startswith('smoke'): 71*2abb3134SXin Li raise RuntimeError( 72*2abb3134SXin Li "Expected job ID to start with '201' or 'smoke': got %r (%s)" % 73*2abb3134SXin Li (job_id, parts)) 74*2abb3134SXin Li 75*2abb3134SXin Li 76*2abb3134SXin Lidef ReadStatus(f): 77*2abb3134SXin Li status_line = f.readline().strip() 78*2abb3134SXin Li return status_line.split()[0] # OK, TIMEOUT, FAIL 79*2abb3134SXin Li 80*2abb3134SXin Li 81*2abb3134SXin Lidef CombineDistTaskStatus(stdin, c_out, mem_by_pid): 82*2abb3134SXin Li """Read status task paths from stdin, write CSV summary to c_out'.""" 83*2abb3134SXin Li 84*2abb3134SXin Li #util.log('%s', mem_by_pid) 85*2abb3134SXin Li 86*2abb3134SXin Li # Parses: 87*2abb3134SXin Li # - input path for metric name and date 88*2abb3134SXin Li # - spec.txt for task params 89*2abb3134SXin Li # - STATUS.txt for task success/failure 90*2abb3134SXin Li # - metrics.json for output metrics 91*2abb3134SXin Li # - log.txt for timing, if it ran to completion 92*2abb3134SXin Li # - and for structured data 93*2abb3134SXin Li # - join with mem by PID 94*2abb3134SXin Li 95*2abb3134SXin Li header = ( 96*2abb3134SXin Li 'job_id', 'params_file', 'map_file', 97*2abb3134SXin Li 'metric', 'date', 98*2abb3134SXin Li 'vm5_peak_kib', 'vm5_mean_kib', # set when not skipped 99*2abb3134SXin Li 'seconds', 'status', 100*2abb3134SXin Li # only set when OK 101*2abb3134SXin Li 'num_reports', 'num_rappor', 'allocated_mass', 102*2abb3134SXin Li # only set when failed 103*2abb3134SXin Li 'fail_reason') 104*2abb3134SXin Li c_out.writerow(header) 105*2abb3134SXin Li 106*2abb3134SXin Li for line in stdin: 107*2abb3134SXin Li # 108*2abb3134SXin Li # Receive a STATUS.txt path on each line of stdin, and parse it. 109*2abb3134SXin Li # 110*2abb3134SXin Li status_path = line.strip() 111*2abb3134SXin Li 112*2abb3134SXin Li with open(status_path) as f: 113*2abb3134SXin Li status = ReadStatus(f) 114*2abb3134SXin Li 115*2abb3134SXin Li # Path should look like this: 116*2abb3134SXin Li # ~/rappor/cron/2015-05-20__19-22-01/raw/Settings.NewTabPage/2015-05-19/STATUS.txt 117*2abb3134SXin Li parts = status_path.split('/') 118*2abb3134SXin Li job_id = parts[-5] 119*2abb3134SXin Li CheckJobId(job_id, parts) 120*2abb3134SXin Li 121*2abb3134SXin Li # 122*2abb3134SXin Li # Parse the job spec 123*2abb3134SXin Li # 124*2abb3134SXin Li result_dir = os.path.dirname(status_path) 125*2abb3134SXin Li spec_file = os.path.join(result_dir, 'spec.txt') 126*2abb3134SXin Li with open(spec_file) as f: 127*2abb3134SXin Li spec_line = f.readline() 128*2abb3134SXin Li # See backfill.sh analyze-one for the order of these 7 fields. 129*2abb3134SXin Li # There are 3 job constants on the front. 130*2abb3134SXin Li (num_reports, metric_name, date, counts_path, params_path, 131*2abb3134SXin Li map_path, _) = spec_line.split() 132*2abb3134SXin Li 133*2abb3134SXin Li # NOTE: These are all constant per metric. Could have another CSV and 134*2abb3134SXin Li # join. But denormalizing is OK for now. 135*2abb3134SXin Li params_file = os.path.basename(params_path) 136*2abb3134SXin Li map_file = os.path.basename(map_path) 137*2abb3134SXin Li 138*2abb3134SXin Li # remove extension 139*2abb3134SXin Li params_file, _ = os.path.splitext(params_file) 140*2abb3134SXin Li map_file, _ = os.path.splitext(map_file) 141*2abb3134SXin Li 142*2abb3134SXin Li # 143*2abb3134SXin Li # Read the log 144*2abb3134SXin Li # 145*2abb3134SXin Li log_file = os.path.join(result_dir, 'log.txt') 146*2abb3134SXin Li with open(log_file) as f: 147*2abb3134SXin Li lines = f.readlines() 148*2abb3134SXin Li 149*2abb3134SXin Li # Search lines in reverse order for total time. It could have output from 150*2abb3134SXin Li # multiple 'time' statements, and we want the last one. 151*2abb3134SXin Li seconds = None # for skipped 152*2abb3134SXin Li for i in xrange(len(lines) - 1, -1, -1): 153*2abb3134SXin Li # TODO: Parse the R timing too. Could use LOG_RECORD_RE. 154*2abb3134SXin Li m = TIMING_RE.search(lines[i]) 155*2abb3134SXin Li if m: 156*2abb3134SXin Li min_part, sec_part = m.groups() 157*2abb3134SXin Li seconds = float(min_part) * 60 + float(sec_part) 158*2abb3134SXin Li break 159*2abb3134SXin Li 160*2abb3134SXin Li # Extract stack trace 161*2abb3134SXin Li if status == 'FAIL': 162*2abb3134SXin Li # Stack trace looks like: "Calls: main -> RunOne ..." 163*2abb3134SXin Li fail_reason = ''.join(line.strip() for line in lines if 'Calls' in line) 164*2abb3134SXin Li else: 165*2abb3134SXin Li fail_reason = None 166*2abb3134SXin Li 167*2abb3134SXin Li # Extract PID and join with memory results 168*2abb3134SXin Li pid = None 169*2abb3134SXin Li vm5_peak_kib = None 170*2abb3134SXin Li vm5_mean_kib = None 171*2abb3134SXin Li if mem_by_pid: 172*2abb3134SXin Li for line in lines: 173*2abb3134SXin Li m = PID_RE.match(line) 174*2abb3134SXin Li if m: 175*2abb3134SXin Li pid = m.group(1) 176*2abb3134SXin Li # Could the PID not exist if the process was super short was less 177*2abb3134SXin Li # than 5 seconds? 178*2abb3134SXin Li try: 179*2abb3134SXin Li vm5_peak_kib, vm5_mean_kib = mem_by_pid[pid] 180*2abb3134SXin Li except KeyError: # sometimes we don't add mem-track on the front 181*2abb3134SXin Li vm5_peak_kib, vm5_mean_kib = None, None 182*2abb3134SXin Li break 183*2abb3134SXin Li else: 184*2abb3134SXin Li pass # we weren't passed memory.csv 185*2abb3134SXin Li 186*2abb3134SXin Li # 187*2abb3134SXin Li # Read the metrics 188*2abb3134SXin Li # 189*2abb3134SXin Li metrics = {} 190*2abb3134SXin Li metrics_file = os.path.join(result_dir, 'metrics.json') 191*2abb3134SXin Li if os.path.isfile(metrics_file): 192*2abb3134SXin Li with open(metrics_file) as f: 193*2abb3134SXin Li metrics = json.load(f) 194*2abb3134SXin Li 195*2abb3134SXin Li num_rappor = metrics.get('num_detected') 196*2abb3134SXin Li allocated_mass = metrics.get('allocated_mass') 197*2abb3134SXin Li 198*2abb3134SXin Li # Construct and write row 199*2abb3134SXin Li row = ( 200*2abb3134SXin Li job_id, params_file, map_file, 201*2abb3134SXin Li metric_name, date, 202*2abb3134SXin Li vm5_peak_kib, vm5_mean_kib, 203*2abb3134SXin Li seconds, status, 204*2abb3134SXin Li num_reports, num_rappor, allocated_mass, 205*2abb3134SXin Li fail_reason) 206*2abb3134SXin Li 207*2abb3134SXin Li c_out.writerow(row) 208*2abb3134SXin Li 209*2abb3134SXin Li 210*2abb3134SXin Lidef CombineAssocTaskStatus(stdin, c_out): 211*2abb3134SXin Li """Read status task paths from stdin, write CSV summary to c_out'.""" 212*2abb3134SXin Li 213*2abb3134SXin Li header = ( 214*2abb3134SXin Li 'job_id', 'metric', 'date', 'status', 'num_reports', 215*2abb3134SXin Li 'total_elapsed_seconds', 'em_elapsed_seconds', 'var1', 'var2', 'd1', 216*2abb3134SXin Li 'd2') 217*2abb3134SXin Li 218*2abb3134SXin Li c_out.writerow(header) 219*2abb3134SXin Li 220*2abb3134SXin Li for line in stdin: 221*2abb3134SXin Li status_path = line.strip() 222*2abb3134SXin Li 223*2abb3134SXin Li with open(status_path) as f: 224*2abb3134SXin Li status = ReadStatus(f) 225*2abb3134SXin Li 226*2abb3134SXin Li parts = status_path.split('/') 227*2abb3134SXin Li job_id = parts[-6] 228*2abb3134SXin Li CheckJobId(job_id, parts) 229*2abb3134SXin Li 230*2abb3134SXin Li # 231*2abb3134SXin Li # Parse the job spec 232*2abb3134SXin Li # 233*2abb3134SXin Li result_dir = os.path.dirname(status_path) 234*2abb3134SXin Li spec_file = os.path.join(result_dir, 'assoc-spec.txt') 235*2abb3134SXin Li with open(spec_file) as f: 236*2abb3134SXin Li spec_line = f.readline() 237*2abb3134SXin Li # See backfill.sh analyze-one for the order of these 7 fields. 238*2abb3134SXin Li # There are 3 job constants on the front. 239*2abb3134SXin Li 240*2abb3134SXin Li # 5 job params 241*2abb3134SXin Li (_, _, _, _, _, 242*2abb3134SXin Li dummy_num_reports, metric_name, date, reports, var1, var2, map1, 243*2abb3134SXin Li output_dir) = spec_line.split() 244*2abb3134SXin Li 245*2abb3134SXin Li # 246*2abb3134SXin Li # Parse decode-assoc metrics 247*2abb3134SXin Li # 248*2abb3134SXin Li metrics = {} 249*2abb3134SXin Li metrics_file = os.path.join(result_dir, 'assoc-metrics.json') 250*2abb3134SXin Li if os.path.isfile(metrics_file): 251*2abb3134SXin Li with open(metrics_file) as f: 252*2abb3134SXin Li metrics = json.load(f) 253*2abb3134SXin Li 254*2abb3134SXin Li # After we run it we have the actual number of reports 255*2abb3134SXin Li num_reports = metrics.get('num_reports') 256*2abb3134SXin Li total_elapsed_seconds = metrics.get('total_elapsed_time') 257*2abb3134SXin Li em_elapsed_seconds = metrics.get('em_elapsed_time') 258*2abb3134SXin Li estimate_dimensions = metrics.get('estimate_dimensions') 259*2abb3134SXin Li if estimate_dimensions: 260*2abb3134SXin Li d1, d2 = estimate_dimensions 261*2abb3134SXin Li else: 262*2abb3134SXin Li d1, d2 = (0, 0) # unknown 263*2abb3134SXin Li 264*2abb3134SXin Li row = ( 265*2abb3134SXin Li job_id, metric_name, date, status, num_reports, total_elapsed_seconds, 266*2abb3134SXin Li em_elapsed_seconds, var1, var2, d1, d2) 267*2abb3134SXin Li c_out.writerow(row) 268*2abb3134SXin Li 269*2abb3134SXin Li 270*2abb3134SXin Lidef main(argv): 271*2abb3134SXin Li action = argv[1] 272*2abb3134SXin Li 273*2abb3134SXin Li try: 274*2abb3134SXin Li mem_csv = argv[2] 275*2abb3134SXin Li except IndexError: 276*2abb3134SXin Li mem_by_pid = None 277*2abb3134SXin Li else: 278*2abb3134SXin Li with open(mem_csv) as f: 279*2abb3134SXin Li mem_by_pid = ParseMemCsv(f) 280*2abb3134SXin Li 281*2abb3134SXin Li if action == 'dist': 282*2abb3134SXin Li c_out = csv.writer(sys.stdout) 283*2abb3134SXin Li CombineDistTaskStatus(sys.stdin, c_out, mem_by_pid) 284*2abb3134SXin Li 285*2abb3134SXin Li elif action == 'assoc': 286*2abb3134SXin Li c_out = csv.writer(sys.stdout) 287*2abb3134SXin Li CombineAssocTaskStatus(sys.stdin, c_out) 288*2abb3134SXin Li 289*2abb3134SXin Li else: 290*2abb3134SXin Li raise RuntimeError('Invalid action %r' % action) 291*2abb3134SXin Li 292*2abb3134SXin Li 293*2abb3134SXin Liif __name__ == '__main__': 294*2abb3134SXin Li try: 295*2abb3134SXin Li main(sys.argv) 296*2abb3134SXin Li except RuntimeError, e: 297*2abb3134SXin Li print >>sys.stderr, 'FATAL: %s' % e 298*2abb3134SXin Li sys.exit(1) 299