| """Per-episode metric extraction (NE / TL / SPL / OSR / nDTW / shortest_path).""" |
| import re |
| import sys |
|
|
| log_path = sys.argv[1] |
| log = open(log_path).read() |
|
|
| |
| pat = re.compile( |
| r'"shortest_path_length":\s*([\d.]+).*?' |
| r'"NE":\s*([\d.]+).*?' |
| r'"success":\s*([\d.]+).*?' |
| r'"osr":\s*([\d.]+).*?' |
| r'"TL":\s*([\d.]+).*?' |
| r'"spl":\s*([\d.]+).*?' |
| r'"ndtw":\s*([\d.]+).*?' |
| r'"steps":\s*(\d+).*?' |
| r'"trajectory_id":\s*"([^"]+)".*?' |
| r'"fail_reason":\s*"([^"]+)"', |
| re.S, |
| ) |
|
|
| finishes = re.compile( |
| r"\[(\d+)/30\]\[step_index:(\d+)\] finish: \[trajectory_id:([^\]]+)\]" |
| r"\[duration:([\d.]+) s\]\[step_count:(\d+)\]\[fps:([\d.]+)\]\[result:([^\]]+)\]" |
| ) |
|
|
| per = {} |
| for m in pat.finditer(log): |
| spl_geo, ne, succ, osr, tl, spl, ndtw, steps, tid, reason = m.groups() |
| per[tid] = dict( |
| spl_geo=float(spl_geo), NE=float(ne), success=float(succ), |
| osr=float(osr), TL=float(tl), SPL=float(spl), nDTW=float(ndtw), |
| steps=int(steps), reason=reason, |
| ) |
|
|
| H = "# | trajectory_id | result | NE | shortP | TL | SPL | OSR | nDTW | steps | sec | fps " |
| print(H) |
| print("-" * len(H)) |
| agg = {k: 0.0 for k in ["NE", "TL", "SPL", "OSR", "nDTW", "spl_geo"]} |
| succ_n = 0 |
| n = 0 |
| fail_tally = {} |
| for m in finishes.finditer(log): |
| idx, step_idx, tid_full, dur, stepc, fps, result = m.groups() |
| parts = tid_full.split("_") |
| if len(parts) >= 8 and "_".join(parts[: len(parts) // 2]) == "_".join(parts[len(parts) // 2 :]): |
| tid_key = "_".join(parts[: len(parts) // 2]) |
| else: |
| tid_key = tid_full |
| e = per.get(tid_key) or {} |
| NE = e.get("NE", float("nan")) |
| TL = e.get("TL", float("nan")) |
| SPL = e.get("SPL", float("nan")) |
| OSR = e.get("osr", float("nan")) |
| nDTW = e.get("nDTW", float("nan")) |
| spg = e.get("spl_geo", float("nan")) |
| succ = e.get("success", 1.0 if result == "success" else 0.0) |
| reason = e.get("reason", result) |
| n += 1 |
| succ_n += int(succ == 1.0) |
| fail_tally[reason] = fail_tally.get(reason, 0) + 1 |
| for k, v in [("NE", NE), ("TL", TL), ("SPL", SPL), ("OSR", OSR), ("nDTW", nDTW), ("spl_geo", spg)]: |
| if v == v: |
| agg[k] += v |
| print( |
| f"{idx:>2} | {tid_key[:24]:<24} | {result[:15]:<15} | " |
| f"{NE:>4.2f} | {spg:>6.2f} | {TL:>4.2f} | {SPL:>4.2f} | " |
| f"{OSR:>4.2f} | {nDTW:>4.2f} | {stepc:>5} | {float(dur):>4.0f} | {float(fps):>4.1f}" |
| ) |
|
|
| print("-" * len(H)) |
| if n: |
| print(f"\nAggregates over {n} episodes:") |
| print(f" SR = {succ_n}/{n} = {100*succ_n/n:5.1f}%") |
| print(f" NE = {agg['NE']/n:5.2f} m (lower better; success_radius = 3.0 m)") |
| print(f" SPL = {agg['SPL']/n:5.3f} (success-weighted, lower path = better)") |
| print(f" OSR = {agg['OSR']/n:5.3f} (oracle success rate — could have stopped within radius at any step)") |
| print(f" nDTW = {agg['nDTW']/n:5.3f} (path alignment 0-1, higher better)") |
| print(f" TL = {agg['TL']/n:5.2f} m (mean trajectory length)") |
| print(f" shortest_path = {agg['spl_geo']/n:5.2f} m (mean geodesic)") |
| print(f" fail_reasons: {fail_tally}") |
|
|