xref: /aosp_15_r20/external/rappor/pipeline/combine_status.py (revision 2abb31345f6c95944768b5222a9a5ed3fc68cc00)
1*2abb3134SXin Li#!/usr/bin/python
2*2abb3134SXin Li"""Summarize the results of many RAPPOR analysis runs.
3*2abb3134SXin Li
4*2abb3134SXin LiTakes a list of STATUS.txt files on stdin, and reads the corresponding spec.txt
5*2abb3134SXin Liand log.txt files.  Writes a CSV to stdout.  Row key is (metric, date).
6*2abb3134SXin Li"""
7*2abb3134SXin Li
8*2abb3134SXin Liimport collections
9*2abb3134SXin Liimport csv
10*2abb3134SXin Liimport json
11*2abb3134SXin Liimport os
12*2abb3134SXin Liimport re
13*2abb3134SXin Liimport sys
14*2abb3134SXin Li
15*2abb3134SXin Li
16*2abb3134SXin Li# Parse bash 'time' output:
17*2abb3134SXin Li# real    0m11.578s
18*2abb3134SXin Li
19*2abb3134SXin Li# TODO: Parse the time from metrics.json instead.
20*2abb3134SXin LiTIMING_RE = re.compile(
21*2abb3134SXin Li    r'real \s+ (\d+) m ([\d.]+) s', re.VERBOSE)
22*2abb3134SXin Li
23*2abb3134SXin Li# TODO: Could have decode-dist and decode-assoc output the PID?
24*2abb3134SXin LiPID_RE = re.compile(
25*2abb3134SXin Li    r'write_pid.py: PID (\d+)')  # not VERBOSE, spaces are literal
26*2abb3134SXin Li
27*2abb3134SXin Li
28*2abb3134SXin Lidef ParseMemCsv(f):
29*2abb3134SXin Li  """Compute summary stats for memory.
30*2abb3134SXin Li
31*2abb3134SXin Li  vm5_peak_kib -> max(vm_peak_kib)  # over 5 second intervals.  Since it uses
32*2abb3134SXin Li  the kernel, it's accurate except for takes that spike in their last 4
33*2abb3134SXin Li  seconds.
34*2abb3134SXin Li
35*2abb3134SXin Li  vm5_mean_kib -> mean(vm_size_kib)  # over 5 second intervals
36*2abb3134SXin Li  """
37*2abb3134SXin Li  peak_by_pid = collections.defaultdict(list)
38*2abb3134SXin Li  size_by_pid = collections.defaultdict(list)
39*2abb3134SXin Li
40*2abb3134SXin Li  # Parse columns we care about, by PID
41*2abb3134SXin Li  c = csv.reader(f)
42*2abb3134SXin Li  for i, row in enumerate(c):
43*2abb3134SXin Li    if i == 0:
44*2abb3134SXin Li      continue  # skip header
45*2abb3134SXin Li    # looks like timestamp, pid, then (rss, peak, size)
46*2abb3134SXin Li    _, pid, _, peak, size = row
47*2abb3134SXin Li    if peak != '':
48*2abb3134SXin Li      peak_by_pid[pid].append(int(peak))
49*2abb3134SXin Li    if size != '':
50*2abb3134SXin Li      size_by_pid[pid].append(int(size))
51*2abb3134SXin Li
52*2abb3134SXin Li  mem_by_pid = {}
53*2abb3134SXin Li
54*2abb3134SXin Li  # Now compute summaries
55*2abb3134SXin Li  pids = peak_by_pid.keys()
56*2abb3134SXin Li  for pid in pids:
57*2abb3134SXin Li    peaks = peak_by_pid[pid]
58*2abb3134SXin Li    vm5_peak_kib = max(peaks)
59*2abb3134SXin Li
60*2abb3134SXin Li    sizes = size_by_pid[pid]
61*2abb3134SXin Li    vm5_mean_kib = sum(sizes) / len(sizes)
62*2abb3134SXin Li
63*2abb3134SXin Li    mem_by_pid[pid] = (vm5_peak_kib, vm5_mean_kib)
64*2abb3134SXin Li
65*2abb3134SXin Li  return mem_by_pid
66*2abb3134SXin Li
67*2abb3134SXin Li
68*2abb3134SXin Lidef CheckJobId(job_id, parts):
69*2abb3134SXin Li  """Sanity check for date or smoke test."""
70*2abb3134SXin Li  if not job_id.startswith('201') and not job_id.startswith('smoke'):
71*2abb3134SXin Li    raise RuntimeError(
72*2abb3134SXin Li        "Expected job ID to start with '201' or 'smoke': got %r (%s)" %
73*2abb3134SXin Li        (job_id, parts))
74*2abb3134SXin Li
75*2abb3134SXin Li
76*2abb3134SXin Lidef ReadStatus(f):
77*2abb3134SXin Li  status_line = f.readline().strip()
78*2abb3134SXin Li  return status_line.split()[0]  # OK, TIMEOUT, FAIL
79*2abb3134SXin Li
80*2abb3134SXin Li
81*2abb3134SXin Lidef CombineDistTaskStatus(stdin, c_out, mem_by_pid):
82*2abb3134SXin Li  """Read status task paths from stdin, write CSV summary to c_out'."""
83*2abb3134SXin Li
84*2abb3134SXin Li  #util.log('%s', mem_by_pid)
85*2abb3134SXin Li
86*2abb3134SXin Li  # Parses:
87*2abb3134SXin Li  # - input path for metric name and date
88*2abb3134SXin Li  # - spec.txt for task params
89*2abb3134SXin Li  # - STATUS.txt for task success/failure
90*2abb3134SXin Li  # - metrics.json for output metrics
91*2abb3134SXin Li  # - log.txt for timing, if it ran to completion
92*2abb3134SXin Li  #   - and for structured data
93*2abb3134SXin Li  # - join with mem by PID
94*2abb3134SXin Li
95*2abb3134SXin Li  header = (
96*2abb3134SXin Li      'job_id', 'params_file', 'map_file',
97*2abb3134SXin Li      'metric', 'date',
98*2abb3134SXin Li      'vm5_peak_kib', 'vm5_mean_kib',  # set when not skipped
99*2abb3134SXin Li      'seconds', 'status',
100*2abb3134SXin Li      # only set when OK
101*2abb3134SXin Li      'num_reports', 'num_rappor', 'allocated_mass',
102*2abb3134SXin Li      # only set when failed
103*2abb3134SXin Li      'fail_reason')
104*2abb3134SXin Li  c_out.writerow(header)
105*2abb3134SXin Li
106*2abb3134SXin Li  for line in stdin:
107*2abb3134SXin Li    #
108*2abb3134SXin Li    # Receive a STATUS.txt path on each line of stdin, and parse it.
109*2abb3134SXin Li    #
110*2abb3134SXin Li    status_path = line.strip()
111*2abb3134SXin Li
112*2abb3134SXin Li    with open(status_path) as f:
113*2abb3134SXin Li      status = ReadStatus(f)
114*2abb3134SXin Li
115*2abb3134SXin Li    # Path should look like this:
116*2abb3134SXin Li    # ~/rappor/cron/2015-05-20__19-22-01/raw/Settings.NewTabPage/2015-05-19/STATUS.txt
117*2abb3134SXin Li    parts = status_path.split('/')
118*2abb3134SXin Li    job_id = parts[-5]
119*2abb3134SXin Li    CheckJobId(job_id, parts)
120*2abb3134SXin Li
121*2abb3134SXin Li    #
122*2abb3134SXin Li    # Parse the job spec
123*2abb3134SXin Li    #
124*2abb3134SXin Li    result_dir = os.path.dirname(status_path)
125*2abb3134SXin Li    spec_file = os.path.join(result_dir, 'spec.txt')
126*2abb3134SXin Li    with open(spec_file) as f:
127*2abb3134SXin Li      spec_line = f.readline()
128*2abb3134SXin Li      # See backfill.sh analyze-one for the order of these 7 fields.
129*2abb3134SXin Li      # There are 3 job constants on the front.
130*2abb3134SXin Li      (num_reports, metric_name, date, counts_path, params_path,
131*2abb3134SXin Li       map_path, _) = spec_line.split()
132*2abb3134SXin Li
133*2abb3134SXin Li    # NOTE: These are all constant per metric.  Could have another CSV and
134*2abb3134SXin Li    # join.  But denormalizing is OK for now.
135*2abb3134SXin Li    params_file = os.path.basename(params_path)
136*2abb3134SXin Li    map_file = os.path.basename(map_path)
137*2abb3134SXin Li
138*2abb3134SXin Li    # remove extension
139*2abb3134SXin Li    params_file, _ = os.path.splitext(params_file)
140*2abb3134SXin Li    map_file, _ = os.path.splitext(map_file)
141*2abb3134SXin Li
142*2abb3134SXin Li    #
143*2abb3134SXin Li    # Read the log
144*2abb3134SXin Li    #
145*2abb3134SXin Li    log_file = os.path.join(result_dir, 'log.txt')
146*2abb3134SXin Li    with open(log_file) as f:
147*2abb3134SXin Li      lines = f.readlines()
148*2abb3134SXin Li
149*2abb3134SXin Li    # Search lines in reverse order for total time.  It could have output from
150*2abb3134SXin Li    # multiple 'time' statements, and we want the last one.
151*2abb3134SXin Li    seconds = None  # for skipped
152*2abb3134SXin Li    for i in xrange(len(lines) - 1, -1, -1):
153*2abb3134SXin Li      # TODO: Parse the R timing too.  Could use LOG_RECORD_RE.
154*2abb3134SXin Li      m = TIMING_RE.search(lines[i])
155*2abb3134SXin Li      if m:
156*2abb3134SXin Li        min_part, sec_part = m.groups()
157*2abb3134SXin Li        seconds = float(min_part) * 60 + float(sec_part)
158*2abb3134SXin Li        break
159*2abb3134SXin Li
160*2abb3134SXin Li    # Extract stack trace
161*2abb3134SXin Li    if status == 'FAIL':
162*2abb3134SXin Li      # Stack trace looks like: "Calls: main -> RunOne ..."
163*2abb3134SXin Li      fail_reason = ''.join(line.strip() for line in lines if 'Calls' in line)
164*2abb3134SXin Li    else:
165*2abb3134SXin Li      fail_reason = None
166*2abb3134SXin Li
167*2abb3134SXin Li    # Extract PID and join with memory results
168*2abb3134SXin Li    pid = None
169*2abb3134SXin Li    vm5_peak_kib = None
170*2abb3134SXin Li    vm5_mean_kib = None
171*2abb3134SXin Li    if mem_by_pid:
172*2abb3134SXin Li      for line in lines:
173*2abb3134SXin Li        m = PID_RE.match(line)
174*2abb3134SXin Li        if m:
175*2abb3134SXin Li          pid = m.group(1)
176*2abb3134SXin Li          # Could the PID not exist if the process was super short was less
177*2abb3134SXin Li          # than 5 seconds?
178*2abb3134SXin Li          try:
179*2abb3134SXin Li            vm5_peak_kib, vm5_mean_kib = mem_by_pid[pid]
180*2abb3134SXin Li          except KeyError:  # sometimes we don't add mem-track on the front
181*2abb3134SXin Li            vm5_peak_kib, vm5_mean_kib = None, None
182*2abb3134SXin Li          break
183*2abb3134SXin Li    else:
184*2abb3134SXin Li      pass  # we weren't passed memory.csv
185*2abb3134SXin Li
186*2abb3134SXin Li    #
187*2abb3134SXin Li    # Read the metrics
188*2abb3134SXin Li    #
189*2abb3134SXin Li    metrics = {}
190*2abb3134SXin Li    metrics_file = os.path.join(result_dir, 'metrics.json')
191*2abb3134SXin Li    if os.path.isfile(metrics_file):
192*2abb3134SXin Li      with open(metrics_file) as f:
193*2abb3134SXin Li        metrics = json.load(f)
194*2abb3134SXin Li
195*2abb3134SXin Li    num_rappor = metrics.get('num_detected')
196*2abb3134SXin Li    allocated_mass = metrics.get('allocated_mass')
197*2abb3134SXin Li
198*2abb3134SXin Li    # Construct and write row
199*2abb3134SXin Li    row = (
200*2abb3134SXin Li        job_id, params_file, map_file,
201*2abb3134SXin Li        metric_name, date,
202*2abb3134SXin Li        vm5_peak_kib, vm5_mean_kib,
203*2abb3134SXin Li        seconds, status,
204*2abb3134SXin Li        num_reports, num_rappor, allocated_mass,
205*2abb3134SXin Li        fail_reason)
206*2abb3134SXin Li
207*2abb3134SXin Li    c_out.writerow(row)
208*2abb3134SXin Li
209*2abb3134SXin Li
210*2abb3134SXin Lidef CombineAssocTaskStatus(stdin, c_out):
211*2abb3134SXin Li  """Read status task paths from stdin, write CSV summary to c_out'."""
212*2abb3134SXin Li
213*2abb3134SXin Li  header = (
214*2abb3134SXin Li      'job_id', 'metric', 'date', 'status', 'num_reports',
215*2abb3134SXin Li      'total_elapsed_seconds', 'em_elapsed_seconds', 'var1', 'var2', 'd1',
216*2abb3134SXin Li      'd2')
217*2abb3134SXin Li
218*2abb3134SXin Li  c_out.writerow(header)
219*2abb3134SXin Li
220*2abb3134SXin Li  for line in stdin:
221*2abb3134SXin Li    status_path = line.strip()
222*2abb3134SXin Li
223*2abb3134SXin Li    with open(status_path) as f:
224*2abb3134SXin Li      status = ReadStatus(f)
225*2abb3134SXin Li
226*2abb3134SXin Li    parts = status_path.split('/')
227*2abb3134SXin Li    job_id = parts[-6]
228*2abb3134SXin Li    CheckJobId(job_id, parts)
229*2abb3134SXin Li
230*2abb3134SXin Li    #
231*2abb3134SXin Li    # Parse the job spec
232*2abb3134SXin Li    #
233*2abb3134SXin Li    result_dir = os.path.dirname(status_path)
234*2abb3134SXin Li    spec_file = os.path.join(result_dir, 'assoc-spec.txt')
235*2abb3134SXin Li    with open(spec_file) as f:
236*2abb3134SXin Li      spec_line = f.readline()
237*2abb3134SXin Li      # See backfill.sh analyze-one for the order of these 7 fields.
238*2abb3134SXin Li      # There are 3 job constants on the front.
239*2abb3134SXin Li
240*2abb3134SXin Li      # 5 job params
241*2abb3134SXin Li      (_, _, _, _, _,
242*2abb3134SXin Li       dummy_num_reports, metric_name, date, reports, var1, var2, map1,
243*2abb3134SXin Li       output_dir) = spec_line.split()
244*2abb3134SXin Li
245*2abb3134SXin Li    #
246*2abb3134SXin Li    # Parse decode-assoc metrics
247*2abb3134SXin Li    #
248*2abb3134SXin Li    metrics = {}
249*2abb3134SXin Li    metrics_file = os.path.join(result_dir, 'assoc-metrics.json')
250*2abb3134SXin Li    if os.path.isfile(metrics_file):
251*2abb3134SXin Li      with open(metrics_file) as f:
252*2abb3134SXin Li        metrics = json.load(f)
253*2abb3134SXin Li
254*2abb3134SXin Li    # After we run it we have the actual number of reports
255*2abb3134SXin Li    num_reports = metrics.get('num_reports')
256*2abb3134SXin Li    total_elapsed_seconds = metrics.get('total_elapsed_time')
257*2abb3134SXin Li    em_elapsed_seconds = metrics.get('em_elapsed_time')
258*2abb3134SXin Li    estimate_dimensions = metrics.get('estimate_dimensions')
259*2abb3134SXin Li    if estimate_dimensions:
260*2abb3134SXin Li      d1, d2 = estimate_dimensions
261*2abb3134SXin Li    else:
262*2abb3134SXin Li      d1, d2 = (0, 0)  # unknown
263*2abb3134SXin Li
264*2abb3134SXin Li    row = (
265*2abb3134SXin Li        job_id, metric_name, date, status, num_reports, total_elapsed_seconds,
266*2abb3134SXin Li        em_elapsed_seconds, var1, var2, d1, d2)
267*2abb3134SXin Li    c_out.writerow(row)
268*2abb3134SXin Li
269*2abb3134SXin Li
270*2abb3134SXin Lidef main(argv):
271*2abb3134SXin Li  action = argv[1]
272*2abb3134SXin Li
273*2abb3134SXin Li  try:
274*2abb3134SXin Li    mem_csv = argv[2]
275*2abb3134SXin Li  except IndexError:
276*2abb3134SXin Li    mem_by_pid = None
277*2abb3134SXin Li  else:
278*2abb3134SXin Li    with open(mem_csv) as f:
279*2abb3134SXin Li      mem_by_pid = ParseMemCsv(f)
280*2abb3134SXin Li
281*2abb3134SXin Li  if action == 'dist':
282*2abb3134SXin Li    c_out = csv.writer(sys.stdout)
283*2abb3134SXin Li    CombineDistTaskStatus(sys.stdin, c_out, mem_by_pid)
284*2abb3134SXin Li
285*2abb3134SXin Li  elif action == 'assoc':
286*2abb3134SXin Li    c_out = csv.writer(sys.stdout)
287*2abb3134SXin Li    CombineAssocTaskStatus(sys.stdin, c_out)
288*2abb3134SXin Li
289*2abb3134SXin Li  else:
290*2abb3134SXin Li    raise RuntimeError('Invalid action %r' % action)
291*2abb3134SXin Li
292*2abb3134SXin Li
293*2abb3134SXin Liif __name__ == '__main__':
294*2abb3134SXin Li  try:
295*2abb3134SXin Li    main(sys.argv)
296*2abb3134SXin Li  except RuntimeError, e:
297*2abb3134SXin Li    print >>sys.stderr, 'FATAL: %s' % e
298*2abb3134SXin Li    sys.exit(1)
299