299 lines
8.1 KiB
Python
Executable File
299 lines
8.1 KiB
Python
Executable File
#!/usr/bin/python
|
|
"""Summarize the results of many RAPPOR analysis runs.
|
|
|
|
Takes a list of STATUS.txt files on stdin, and reads the corresponding spec.txt
|
|
and log.txt files. Writes a CSV to stdout. Row key is (metric, date).
|
|
"""
|
|
|
|
import collections
|
|
import csv
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
|
|
# Parse bash 'time' output:
|
|
# real 0m11.578s
|
|
|
|
# TODO: Parse the time from metrics.json instead.
|
|
TIMING_RE = re.compile(
|
|
r'real \s+ (\d+) m ([\d.]+) s', re.VERBOSE)
|
|
|
|
# TODO: Could have decode-dist and decode-assoc output the PID?
|
|
PID_RE = re.compile(
|
|
r'write_pid.py: PID (\d+)') # not VERBOSE, spaces are literal
|
|
|
|
|
|
def ParseMemCsv(f):
|
|
"""Compute summary stats for memory.
|
|
|
|
vm5_peak_kib -> max(vm_peak_kib) # over 5 second intervals. Since it uses
|
|
the kernel, it's accurate except for takes that spike in their last 4
|
|
seconds.
|
|
|
|
vm5_mean_kib -> mean(vm_size_kib) # over 5 second intervals
|
|
"""
|
|
peak_by_pid = collections.defaultdict(list)
|
|
size_by_pid = collections.defaultdict(list)
|
|
|
|
# Parse columns we care about, by PID
|
|
c = csv.reader(f)
|
|
for i, row in enumerate(c):
|
|
if i == 0:
|
|
continue # skip header
|
|
# looks like timestamp, pid, then (rss, peak, size)
|
|
_, pid, _, peak, size = row
|
|
if peak != '':
|
|
peak_by_pid[pid].append(int(peak))
|
|
if size != '':
|
|
size_by_pid[pid].append(int(size))
|
|
|
|
mem_by_pid = {}
|
|
|
|
# Now compute summaries
|
|
pids = peak_by_pid.keys()
|
|
for pid in pids:
|
|
peaks = peak_by_pid[pid]
|
|
vm5_peak_kib = max(peaks)
|
|
|
|
sizes = size_by_pid[pid]
|
|
vm5_mean_kib = sum(sizes) / len(sizes)
|
|
|
|
mem_by_pid[pid] = (vm5_peak_kib, vm5_mean_kib)
|
|
|
|
return mem_by_pid
|
|
|
|
|
|
def CheckJobId(job_id, parts):
|
|
"""Sanity check for date or smoke test."""
|
|
if not job_id.startswith('201') and not job_id.startswith('smoke'):
|
|
raise RuntimeError(
|
|
"Expected job ID to start with '201' or 'smoke': got %r (%s)" %
|
|
(job_id, parts))
|
|
|
|
|
|
def ReadStatus(f):
|
|
status_line = f.readline().strip()
|
|
return status_line.split()[0] # OK, TIMEOUT, FAIL
|
|
|
|
|
|
def CombineDistTaskStatus(stdin, c_out, mem_by_pid):
|
|
"""Read status task paths from stdin, write CSV summary to c_out'."""
|
|
|
|
#util.log('%s', mem_by_pid)
|
|
|
|
# Parses:
|
|
# - input path for metric name and date
|
|
# - spec.txt for task params
|
|
# - STATUS.txt for task success/failure
|
|
# - metrics.json for output metrics
|
|
# - log.txt for timing, if it ran to completion
|
|
# - and for structured data
|
|
# - join with mem by PID
|
|
|
|
header = (
|
|
'job_id', 'params_file', 'map_file',
|
|
'metric', 'date',
|
|
'vm5_peak_kib', 'vm5_mean_kib', # set when not skipped
|
|
'seconds', 'status',
|
|
# only set when OK
|
|
'num_reports', 'num_rappor', 'allocated_mass',
|
|
# only set when failed
|
|
'fail_reason')
|
|
c_out.writerow(header)
|
|
|
|
for line in stdin:
|
|
#
|
|
# Receive a STATUS.txt path on each line of stdin, and parse it.
|
|
#
|
|
status_path = line.strip()
|
|
|
|
with open(status_path) as f:
|
|
status = ReadStatus(f)
|
|
|
|
# Path should look like this:
|
|
# ~/rappor/cron/2015-05-20__19-22-01/raw/Settings.NewTabPage/2015-05-19/STATUS.txt
|
|
parts = status_path.split('/')
|
|
job_id = parts[-5]
|
|
CheckJobId(job_id, parts)
|
|
|
|
#
|
|
# Parse the job spec
|
|
#
|
|
result_dir = os.path.dirname(status_path)
|
|
spec_file = os.path.join(result_dir, 'spec.txt')
|
|
with open(spec_file) as f:
|
|
spec_line = f.readline()
|
|
# See backfill.sh analyze-one for the order of these 7 fields.
|
|
# There are 3 job constants on the front.
|
|
(num_reports, metric_name, date, counts_path, params_path,
|
|
map_path, _) = spec_line.split()
|
|
|
|
# NOTE: These are all constant per metric. Could have another CSV and
|
|
# join. But denormalizing is OK for now.
|
|
params_file = os.path.basename(params_path)
|
|
map_file = os.path.basename(map_path)
|
|
|
|
# remove extension
|
|
params_file, _ = os.path.splitext(params_file)
|
|
map_file, _ = os.path.splitext(map_file)
|
|
|
|
#
|
|
# Read the log
|
|
#
|
|
log_file = os.path.join(result_dir, 'log.txt')
|
|
with open(log_file) as f:
|
|
lines = f.readlines()
|
|
|
|
# Search lines in reverse order for total time. It could have output from
|
|
# multiple 'time' statements, and we want the last one.
|
|
seconds = None # for skipped
|
|
for i in xrange(len(lines) - 1, -1, -1):
|
|
# TODO: Parse the R timing too. Could use LOG_RECORD_RE.
|
|
m = TIMING_RE.search(lines[i])
|
|
if m:
|
|
min_part, sec_part = m.groups()
|
|
seconds = float(min_part) * 60 + float(sec_part)
|
|
break
|
|
|
|
# Extract stack trace
|
|
if status == 'FAIL':
|
|
# Stack trace looks like: "Calls: main -> RunOne ..."
|
|
fail_reason = ''.join(line.strip() for line in lines if 'Calls' in line)
|
|
else:
|
|
fail_reason = None
|
|
|
|
# Extract PID and join with memory results
|
|
pid = None
|
|
vm5_peak_kib = None
|
|
vm5_mean_kib = None
|
|
if mem_by_pid:
|
|
for line in lines:
|
|
m = PID_RE.match(line)
|
|
if m:
|
|
pid = m.group(1)
|
|
# Could the PID not exist if the process was super short was less
|
|
# than 5 seconds?
|
|
try:
|
|
vm5_peak_kib, vm5_mean_kib = mem_by_pid[pid]
|
|
except KeyError: # sometimes we don't add mem-track on the front
|
|
vm5_peak_kib, vm5_mean_kib = None, None
|
|
break
|
|
else:
|
|
pass # we weren't passed memory.csv
|
|
|
|
#
|
|
# Read the metrics
|
|
#
|
|
metrics = {}
|
|
metrics_file = os.path.join(result_dir, 'metrics.json')
|
|
if os.path.isfile(metrics_file):
|
|
with open(metrics_file) as f:
|
|
metrics = json.load(f)
|
|
|
|
num_rappor = metrics.get('num_detected')
|
|
allocated_mass = metrics.get('allocated_mass')
|
|
|
|
# Construct and write row
|
|
row = (
|
|
job_id, params_file, map_file,
|
|
metric_name, date,
|
|
vm5_peak_kib, vm5_mean_kib,
|
|
seconds, status,
|
|
num_reports, num_rappor, allocated_mass,
|
|
fail_reason)
|
|
|
|
c_out.writerow(row)
|
|
|
|
|
|
def CombineAssocTaskStatus(stdin, c_out):
|
|
"""Read status task paths from stdin, write CSV summary to c_out'."""
|
|
|
|
header = (
|
|
'job_id', 'metric', 'date', 'status', 'num_reports',
|
|
'total_elapsed_seconds', 'em_elapsed_seconds', 'var1', 'var2', 'd1',
|
|
'd2')
|
|
|
|
c_out.writerow(header)
|
|
|
|
for line in stdin:
|
|
status_path = line.strip()
|
|
|
|
with open(status_path) as f:
|
|
status = ReadStatus(f)
|
|
|
|
parts = status_path.split('/')
|
|
job_id = parts[-6]
|
|
CheckJobId(job_id, parts)
|
|
|
|
#
|
|
# Parse the job spec
|
|
#
|
|
result_dir = os.path.dirname(status_path)
|
|
spec_file = os.path.join(result_dir, 'assoc-spec.txt')
|
|
with open(spec_file) as f:
|
|
spec_line = f.readline()
|
|
# See backfill.sh analyze-one for the order of these 7 fields.
|
|
# There are 3 job constants on the front.
|
|
|
|
# 5 job params
|
|
(_, _, _, _, _,
|
|
dummy_num_reports, metric_name, date, reports, var1, var2, map1,
|
|
output_dir) = spec_line.split()
|
|
|
|
#
|
|
# Parse decode-assoc metrics
|
|
#
|
|
metrics = {}
|
|
metrics_file = os.path.join(result_dir, 'assoc-metrics.json')
|
|
if os.path.isfile(metrics_file):
|
|
with open(metrics_file) as f:
|
|
metrics = json.load(f)
|
|
|
|
# After we run it we have the actual number of reports
|
|
num_reports = metrics.get('num_reports')
|
|
total_elapsed_seconds = metrics.get('total_elapsed_time')
|
|
em_elapsed_seconds = metrics.get('em_elapsed_time')
|
|
estimate_dimensions = metrics.get('estimate_dimensions')
|
|
if estimate_dimensions:
|
|
d1, d2 = estimate_dimensions
|
|
else:
|
|
d1, d2 = (0, 0) # unknown
|
|
|
|
row = (
|
|
job_id, metric_name, date, status, num_reports, total_elapsed_seconds,
|
|
em_elapsed_seconds, var1, var2, d1, d2)
|
|
c_out.writerow(row)
|
|
|
|
|
|
def main(argv):
|
|
action = argv[1]
|
|
|
|
try:
|
|
mem_csv = argv[2]
|
|
except IndexError:
|
|
mem_by_pid = None
|
|
else:
|
|
with open(mem_csv) as f:
|
|
mem_by_pid = ParseMemCsv(f)
|
|
|
|
if action == 'dist':
|
|
c_out = csv.writer(sys.stdout)
|
|
CombineDistTaskStatus(sys.stdin, c_out, mem_by_pid)
|
|
|
|
elif action == 'assoc':
|
|
c_out = csv.writer(sys.stdout)
|
|
CombineAssocTaskStatus(sys.stdin, c_out)
|
|
|
|
else:
|
|
raise RuntimeError('Invalid action %r' % action)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
main(sys.argv)
|
|
except RuntimeError, e:
|
|
print >>sys.stderr, 'FATAL: %s' % e
|
|
sys.exit(1)
|