#!/usr/bin/env python import argparse import json import pandas import subprocess import sys from pathlib import Path def load_timing_file(timing_path: Path): df = pandas.read_csv(timing_path, delim_whitespace=True) headers = list(df["#Name"][1:]) times = list(df["TotalTime"][1:]) return pandas.DataFrame([times], columns=headers) def load_all_timings(outdir_path: Path): timingdir_path = outdir_path / "timing" timing_paths = sorted(timingdir_path.iterdir()) print(f"Loading {len(timing_paths)} timing files from {timingdir_path}...", file=sys.stderr) dfs = [] for i, timing_path in enumerate(timing_paths, 1): dfs.append(load_timing_file(timing_path)) sys.stderr.write("\x1b[1K\r") sys.stderr.flush() print(f"[{i:8}/{len(timing_paths):8}] {i/len(timing_paths)*100:6.2f}%", file=sys.stderr, end="", flush=True) print(file=sys.stderr, flush=True) return pandas.concat(dfs) def load_mean_timings(outdir_path: Path): df = load_all_timings(outdir_path) return df.mean() / 1000000 def get_outdirs(jobid: str): print(f"Globbing for {jobid}...", file=sys.stderr) return sorted(Path("/p/scratch/cellsinsilico/paul/nastja-out").glob(f"*{jobid}*")) def load_array_mean_timings(jobid: str, excluded_array_indices): mts = [] for outdir_path in get_outdirs(jobid): if any(str(outdir_path).endswith(str(i)) for i in excluded_array_indices): print(f"Not loading timings for {outdir_path} because it was excluded.", file=sys.stderr) continue mts.append(load_mean_timings(outdir_path)) return pandas.DataFrame(list(mts), columns=mts[0].index) def get_mean_mean_totaltimes(jobid: str): return load_array_mean_timings(jobid).mean() def get_std_mean_totaltimes(jobid: str): return load_array_mean_timings(jobid).std() def get_accounting_data(jobid: str): sacct_results = subprocess.run( ["sacct", "--json", "--jobs", jobid], check=True, # Throw on non-zero exit code, capture_output=True ) return json.loads(sacct_results.stdout.decode("utf8")) def get_jobs(jobid: str): accounting_data = get_accounting_data(jobid) jobs = [] excluded_array_indices = [] for array_job in accounting_data["jobs"]: # Get metadata related to array array_main_job = array_job["array"]["job_id"] array_index = array_job["array"]["task_id"] # The last step is the actual job we want the data for # The steps before set up cluster etc. last_step = array_job["steps"][-1] if last_step["state"] != "COMPLETED": print(f"WARNING: {array_main_job}.{array_index} has state {last_step['state']}, excluding it from measurements", file=sys.stderr) excluded_array_indices.append(array_index) continue jobs.append(last_step) return jobs, excluded_array_indices if __name__ == "__main__": p = argparse.ArgumentParser(description="Load and analzye data from nastja timing files") p.add_argument("jobid", nargs="+") p.add_argument("--prettify", action="store_true") p.add_argument("--dump-timings", action="store_true") args = p.parse_args() results = [] for i, jobid in enumerate(args.jobid, 1): print(f"({i:2}/{len(args.jobid):2}) Loading accounting data for {jobid}", file=sys.stderr) jobs, excluded_array_indices = get_jobs(jobid) array_mean_timings = load_array_mean_timings(jobid, excluded_array_indices) if args.dump_timings: print(array_mean_timings, file=sys.stderr) results.append({ "jobid": jobid, "means": array_mean_timings.mean().to_dict(), "stds": array_mean_timings.std().to_dict(), "accounting": jobs }) print(json.dumps(results, indent=2 if args.prettify else None))