"""Replay buffer and failure-case mining utilities.""" from __future__ import annotations from dataclasses import dataclass, field import json from pathlib import Path from typing import Any @dataclass class ReplayBuffer: records: list[dict[str, Any]] = field(default_factory=list) def add(self, payload: dict[str, Any]) -> None: self.records.append(payload) def failures(self) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] for row in self.records: reasons = row.get("failure_reasons") or [] if reasons: out.append(row) return out def dump_jsonl(self, path: Path) -> Path: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: for row in self.records: f.write(json.dumps(row, ensure_ascii=True) + "\n") return path def dump_failures_json(self, path: Path) -> Path: path.parent.mkdir(parents=True, exist_ok=True) failures = self.failures() path.write_text(json.dumps(failures, ensure_ascii=True, indent=2), encoding="utf-8") return path def failure_mining_summary(rows: list[dict[str, Any]]) -> dict[str, Any]: reason_counts: dict[str, int] = {} for row in rows: for reason in row.get("failure_reasons") or []: reason_counts[reason] = reason_counts.get(reason, 0) + 1 ranked = sorted(reason_counts.items(), key=lambda item: item[1], reverse=True) return { "total_rows": len(rows), "failure_rows": sum(1 for row in rows if row.get("failure_reasons")), "top_failure_reasons": [{"reason": k, "count": v} for k, v in ranked[:20]], }