117 lines
3.8 KiB
Python
Executable File

#!/usr/bin/env python
import argparse
import json
import pandas
import subprocess
import sys
from pathlib import Path
def load_timing_file(timing_path: Path):
df = pandas.read_csv(timing_path, delim_whitespace=True)
headers = list(df["#Name"][1:])
times = list(df["TotalTime"][1:])
return pandas.DataFrame([times], columns=headers)
def load_all_timings(outdir_path: Path):
timingdir_path = outdir_path / "timing"
timing_paths = sorted(timingdir_path.iterdir())
print(f"Loading {len(timing_paths)} timing files from {timingdir_path}...", file=sys.stderr)
dfs = []
for i, timing_path in enumerate(timing_paths, 1):
dfs.append(load_timing_file(timing_path))
sys.stderr.write("\x1b[1K\r")
sys.stderr.flush()
print(f"[{i:8}/{len(timing_paths):8}] {i/len(timing_paths)*100:6.2f}%", file=sys.stderr, end="", flush=True)
print(file=sys.stderr, flush=True)
return pandas.concat(dfs)
def load_mean_timings(outdir_path: Path):
df = load_all_timings(outdir_path)
return df.mean() / 1000000
def get_outdirs(jobid: str):
print(f"Globbing for {jobid}...", file=sys.stderr)
return sorted(Path("/p/scratch/cellsinsilico/paul/nastja-out").glob(f"*{jobid}*"))
def load_array_mean_timings(jobid: str, excluded_array_indices):
mts = []
for outdir_path in get_outdirs(jobid):
if any(str(outdir_path).endswith(str(i)) for i in excluded_array_indices):
print(f"Not loading timings for {outdir_path} because it was excluded.", file=sys.stderr)
continue
mts.append(load_mean_timings(outdir_path))
return pandas.DataFrame(list(mts), columns=mts[0].index)
def get_mean_mean_totaltimes(jobid: str):
return load_array_mean_timings(jobid).mean()
def get_std_mean_totaltimes(jobid: str):
return load_array_mean_timings(jobid).std()
def get_accounting_data(jobid: str):
sacct_results = subprocess.run(
["sacct", "--json", "--jobs", jobid],
check=True, # Throw on non-zero exit code,
capture_output=True
)
return json.loads(sacct_results.stdout.decode("utf8"))
def get_jobs(jobid: str):
accounting_data = get_accounting_data(jobid)
jobs = []
excluded_array_indices = []
for array_job in accounting_data["jobs"]:
# Get metadata related to array
array_main_job = array_job["array"]["job_id"]
array_index = array_job["array"]["task_id"]
# The last step is the actual job we want the data for
# The steps before set up cluster etc.
last_step = array_job["steps"][-1]
if last_step["state"] != "COMPLETED":
print(f"WARNING: {array_main_job}.{array_index} has state {last_step['state']}, excluding it from measurements", file=sys.stderr)
excluded_array_indices.append(array_index)
continue
jobs.append(last_step)
return jobs, excluded_array_indices
if __name__ == "__main__":
p = argparse.ArgumentParser(description="Load and analzye data from nastja timing files")
p.add_argument("jobid", nargs="+")
p.add_argument("--prettify", action="store_true")
p.add_argument("--dump-timings", action="store_true")
args = p.parse_args()
results = []
for i, jobid in enumerate(args.jobid, 1):
print(f"({i:2}/{len(args.jobid):2}) Loading accounting data for {jobid}", file=sys.stderr)
jobs, excluded_array_indices = get_jobs(jobid)
array_mean_timings = load_array_mean_timings(jobid, excluded_array_indices)
if args.dump_timings:
print(array_mean_timings, file=sys.stderr)
results.append({
"jobid": jobid,
"means": array_mean_timings.mean().to_dict(),
"stds": array_mean_timings.std().to_dict(),
"accounting": jobs
})
print(json.dumps(results, indent=2 if args.prettify else None))