110 lines
3.6 KiB
Python
Executable File
110 lines
3.6 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
|
|
import argparse
|
|
import json
|
|
import pandas
|
|
import subprocess
|
|
import sys
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
def load_timing_file(timing_path: Path):
|
|
df = pandas.read_csv(timing_path, delim_whitespace=True)
|
|
headers = list(df["#Name"][1:])
|
|
times = list(df["TotalTime"][1:])
|
|
return pandas.DataFrame([times], columns=headers)
|
|
|
|
|
|
def load_all_timings(outdir_path: Path):
|
|
timingdir_path = outdir_path / "timing"
|
|
timing_paths = sorted(timingdir_path.iterdir())
|
|
|
|
print(f"Loading {len(timing_paths)} timing files from {timingdir_path}...", file=sys.stderr)
|
|
dfs = []
|
|
for i, timing_path in enumerate(timing_paths, 1):
|
|
dfs.append(load_timing_file(timing_path))
|
|
sys.stderr.write("\x1b[1K\r")
|
|
sys.stderr.flush()
|
|
print(f"[{i:8}/{len(timing_paths):8}] {i/len(timing_paths)*100:6.2f}%", file=sys.stderr, end="", flush=True)
|
|
print(file=sys.stderr, flush=True)
|
|
|
|
return pandas.concat(dfs)
|
|
|
|
|
|
def load_mean_timings(outdir_path: Path):
|
|
df = load_all_timings(outdir_path)
|
|
return df.mean() / 1000000
|
|
|
|
|
|
def get_outdirs(jobid: str):
|
|
print(f"Globbing for {jobid}...", file=sys.stderr)
|
|
return sorted(Path("/p/scratch/cellsinsilico/paul/nastja-out").glob(f"*{jobid}*"))
|
|
|
|
|
|
def load_array_mean_timings(jobid: str, excluded_array_indices):
|
|
mts = []
|
|
for outdir_path in get_outdirs(jobid):
|
|
if any(str(outdir_path).endswith(str(i)) for i in excluded_array_indices):
|
|
print(f"Not loading timings for {outdir_path} because it was excluded.", file=sys.stderr)
|
|
continue
|
|
mts.append(load_mean_timings(outdir_path))
|
|
|
|
return pandas.DataFrame(list(mts), columns=mts[0].index)
|
|
|
|
|
|
def get_mean_mean_totaltimes(jobid: str):
|
|
return load_array_mean_timings(jobid).mean()
|
|
|
|
|
|
def get_std_mean_totaltimes(jobid: str):
|
|
return load_array_mean_timings(jobid).std()
|
|
|
|
|
|
def get_accounting_data(jobid: str):
|
|
sacct_results = subprocess.run(
|
|
["sacct", "--json", "--jobs", jobid],
|
|
check=True, # Throw on non-zero exit code,
|
|
capture_output=True
|
|
)
|
|
return json.loads(sacct_results.stdout.decode("utf8"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
p = argparse.ArgumentParser(description="Load and analzye data from nastja timing files")
|
|
p.add_argument("jobid", nargs="+")
|
|
p.add_argument("--prettify", action="store_true")
|
|
args = p.parse_args()
|
|
|
|
results = []
|
|
for i, jobid in enumerate(args.jobid, 1):
|
|
print(f"({i:2}/{len(args.jobid):2}) Loading accounting data for {jobid}", file=sys.stderr)
|
|
accounting_data = get_accounting_data(jobid)
|
|
|
|
jobs = []
|
|
excluded_array_indices = []
|
|
for array_job in accounting_data["jobs"]:
|
|
# Get metadata related to array
|
|
array_main_job = array_job["array"]["job_id"]
|
|
array_index = array_job["array"]["task_id"]
|
|
# The last step is the actual job we want the data for
|
|
# The steps before set up cluster etc.
|
|
last_step = array_job["steps"][-1]
|
|
if last_step["state"] != "COMPLETED":
|
|
print(f"WARNING: {array_main_job}.{array_index} has state {last_step['state']}, excluding it from measurements", file=sys.stderr)
|
|
excluded_array_indices.append(array_index)
|
|
continue
|
|
jobs.append(last_step)
|
|
|
|
array_mean_timings = load_array_mean_timings(jobid, excluded_array_indices)
|
|
|
|
results.append({
|
|
"jobid": jobid,
|
|
"means": array_mean_timings.mean().to_dict(),
|
|
"stds": array_mean_timings.std().to_dict(),
|
|
"accounting": jobs
|
|
})
|
|
print(json.dumps(results, indent=2 if args.prettify else None))
|