| """Replay buffer and failure-case mining utilities.""" |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
|
|
| @dataclass |
| class ReplayBuffer: |
| records: list[dict[str, Any]] = field(default_factory=list) |
|
|
| def add(self, payload: dict[str, Any]) -> None: |
| self.records.append(payload) |
|
|
| def failures(self) -> list[dict[str, Any]]: |
| out: list[dict[str, Any]] = [] |
| for row in self.records: |
| reasons = row.get("failure_reasons") or [] |
| if reasons: |
| out.append(row) |
| return out |
|
|
| def dump_jsonl(self, path: Path) -> Path: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with path.open("w", encoding="utf-8") as f: |
| for row in self.records: |
| f.write(json.dumps(row, ensure_ascii=True) + "\n") |
| return path |
|
|
| def dump_failures_json(self, path: Path) -> Path: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| failures = self.failures() |
| path.write_text(json.dumps(failures, ensure_ascii=True, indent=2), encoding="utf-8") |
| return path |
|
|
|
|
| def failure_mining_summary(rows: list[dict[str, Any]]) -> dict[str, Any]: |
| reason_counts: dict[str, int] = {} |
| for row in rows: |
| for reason in row.get("failure_reasons") or []: |
| reason_counts[reason] = reason_counts.get(reason, 0) + 1 |
| ranked = sorted(reason_counts.items(), key=lambda item: item[1], reverse=True) |
| return { |
| "total_rows": len(rows), |
| "failure_rows": sum(1 for row in rows if row.get("failure_reasons")), |
| "top_failure_reasons": [{"reason": k, "count": v} for k, v in ranked[:20]], |
| } |
|
|