File size: 3,911 Bytes
a97e900
 
 
 
33d6b64
 
a97e900
 
 
 
 
33d6b64
 
a97e900
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33d6b64
 
a97e900
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
"""Meta-Tool Miner: Mine repeated successful traces into reusable workflows."""
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import json, hashlib

@dataclass
class MacroTool:
    name: str
    steps: List[Dict]  # sequence of tool calls
    trigger_pattern: str
    success_rate: float
    avg_cost: float
    avg_latency_ms: float
    usage_count: int
    last_used: str = ""

COMMON_WORKFLOWS = [
    {"name":"search_inspect_patch","steps":["code_search","file_read","file_write"],
     "trigger":"fix.*bug|implement.*feature|refactor"},
    {"name":"research_answer_verify","steps":["web_search","summarize","verify"],
     "trigger":"research|investigate|compare"},
    {"name":"read_repair_retry","steps":["file_read","code_execute","file_read"],
     "trigger":"debug|fix.*error|repair"},
    {"name":"retrieve_extract_answer","steps":["web_search","file_read","summarize"],
     "trigger":"find|look up|search.*for"},
    {"name":"draft_review","steps":["file_read","summarize","verify"],
     "trigger":"draft|review|check"},
]

class MetaToolMiner:
    def __init__(self, min_occurrences: int = 3, min_success_rate: float = 0.7):
        self.min_occurrences = min_occurrences
        self.min_success_rate = min_success_rate
        self.discovered_macros: Dict[str, MacroTool] = {}
        self._trace_patterns: Dict[str, List[Dict]] = {}
        # Pre-load common workflows
        for wf in COMMON_WORKFLOWS:
            self.discovered_macros[wf["name"]] = MacroTool(
                name=wf["name"], steps=[{"tool":s} for s in wf["steps"]],
                trigger_pattern=wf["trigger"], success_rate=0.8,
                avg_cost=0.05, avg_latency_ms=5000, usage_count=0)

    def mine_traces(self, traces: List[Dict]) -> List[MacroTool]:
        for trace in traces:
            if not trace.get("task_success", False): continue
            tool_seq = []
            for step in trace.get("steps", []):
                for tc in step.get("tool_calls", []):
                    tool_seq.append(tc.get("tool_name","unknown"))
            if len(tool_seq) >= 2:
                pattern_key = "->".join(tool_seq)
                if pattern_key not in self._trace_patterns:
                    self._trace_patterns[pattern_key] = []
                self._trace_patterns[pattern_key].append(trace)
        new_macros = []
        for pattern, pattern_traces in self._trace_patterns.items():
            if len(pattern_traces) < self.min_occurrences: continue
            success_rate = sum(1 for t in pattern_traces if t.get("task_success",False)) / len(pattern_traces)
            if success_rate < self.min_success_rate: continue
            name = f"macro_{hashlib.md5(pattern.encode()).hexdigest()[:8]}"
            if name in self.discovered_macros: continue
            macro = MacroTool(
                name=name,
                steps=[{"tool":t} for t in pattern.split("->")],
                trigger_pattern="",
                success_rate=success_rate,
                avg_cost=sum(t.get("total_cost",0) for t in pattern_traces) / len(pattern_traces),
                avg_latency_ms=sum(t.get("latency_total_ms",0) for t in pattern_traces) / len(pattern_traces),
                usage_count=len(pattern_traces))
            self.discovered_macros[name] = macro
            new_macros.append(macro)
        return new_macros

    def match_macro(self, request: str, task_type: str) -> Optional[MacroTool]:
        import re
        r = request.lower()
        best = None
        best_score = 0
        for macro in self.discovered_macros.values():
            if macro.trigger_pattern:
                if re.search(macro.trigger_pattern, r):
                    score = macro.success_rate * len(macro.steps)
                    if score > best_score:
                        best_score = score
                        best = macro
        return best