File size: 1,725 Bytes
877add7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
"""Replay buffer and failure-case mining utilities."""

from __future__ import annotations

from dataclasses import dataclass, field
import json
from pathlib import Path
from typing import Any


@dataclass
class ReplayBuffer:
    records: list[dict[str, Any]] = field(default_factory=list)

    def add(self, payload: dict[str, Any]) -> None:
        self.records.append(payload)

    def failures(self) -> list[dict[str, Any]]:
        out: list[dict[str, Any]] = []
        for row in self.records:
            reasons = row.get("failure_reasons") or []
            if reasons:
                out.append(row)
        return out

    def dump_jsonl(self, path: Path) -> Path:
        path.parent.mkdir(parents=True, exist_ok=True)
        with path.open("w", encoding="utf-8") as f:
            for row in self.records:
                f.write(json.dumps(row, ensure_ascii=True) + "\n")
        return path

    def dump_failures_json(self, path: Path) -> Path:
        path.parent.mkdir(parents=True, exist_ok=True)
        failures = self.failures()
        path.write_text(json.dumps(failures, ensure_ascii=True, indent=2), encoding="utf-8")
        return path


def failure_mining_summary(rows: list[dict[str, Any]]) -> dict[str, Any]:
    reason_counts: dict[str, int] = {}
    for row in rows:
        for reason in row.get("failure_reasons") or []:
            reason_counts[reason] = reason_counts.get(reason, 0) + 1
    ranked = sorted(reason_counts.items(), key=lambda item: item[1], reverse=True)
    return {
        "total_rows": len(rows),
        "failure_rows": sum(1 for row in rows if row.get("failure_reasons")),
        "top_failure_reasons": [{"reason": k, "count": v} for k, v in ranked[:20]],
    }