narcolepticchicken commited on
Commit
a97e900
·
verified ·
1 Parent(s): 2ffdfdb

Upload aco/meta_tool_miner.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. aco/meta_tool_miner.py +80 -194
aco/meta_tool_miner.py CHANGED
@@ -1,200 +1,86 @@
1
- """Meta-Tool / Workflow Compression - Module 9.
2
-
3
- Mines repeated successful traces into reusable workflows.
4
-
5
- Compresses recurring workflows into:
6
- - deterministic scripts
7
- - macro tools
8
- - reusable skills
9
- - cached plans
10
-
11
- Metrics:
12
- - LLM calls saved
13
- - workflow success
14
- - bad automation rate
15
- - regression rate
16
- - latency saved
17
- """
18
-
19
- import hashlib
20
- import json
21
- from typing import Dict, List, Tuple, Optional, Set
22
- from dataclasses import dataclass, field
23
- from collections import defaultdict
24
-
25
- from .trace_schema import AgentTrace, TraceStep, Outcome, TaskType
26
- from .config import ACOConfig
27
-
28
 
29
  @dataclass
30
- class WorkflowPattern:
31
- pattern_id: str
32
- task_type: TaskType
33
- tool_sequence: Tuple[str, ...]
34
- frequency: int
35
- success_count: int
36
- failure_count: int
37
  avg_cost: float
38
  avg_latency_ms: float
39
- compressed_script: Optional[str] = None
40
- is_deterministic: bool = False
41
-
 
 
 
 
 
 
 
 
 
 
 
 
42
 
43
  class MetaToolMiner:
44
- """Discovers and compresses repeated agent workflows into meta-tools."""
45
-
46
- def __init__(self, config: Optional[ACOConfig] = None):
47
- self.config = config or ACOConfig()
48
- self.patterns: Dict[str, WorkflowPattern] = {}
49
- self.meta_tools: Dict[str, Dict] = {} # pattern_id -> meta_tool definition
50
- self.trace_buffer: List[AgentTrace] = []
51
-
52
- def ingest_trace(self, trace: AgentTrace) -> None:
53
- """Add a completed trace for pattern mining."""
54
- if trace.final_outcome not in (Outcome.SUCCESS, Outcome.PARTIAL_SUCCESS):
55
- return # Only mine successful workflows
56
-
57
- self.trace_buffer.append(trace)
58
-
59
- # Extract tool sequences
60
- tool_sequence = tuple(
61
- tc.tool_name
62
- for step in trace.steps
63
- for tc in step.tool_calls
64
- )
65
-
66
- if len(tool_sequence) < 2:
67
- return
68
-
69
- # Hash the sequence
70
- seq_hash = hashlib.md5(json.dumps(tool_sequence).encode()).hexdigest()[:16]
71
-
72
- # Update or create pattern
73
- if seq_hash in self.patterns:
74
- pattern = self.patterns[seq_hash]
75
- pattern.frequency += 1
76
- if trace.final_outcome == Outcome.SUCCESS:
77
- pattern.success_count += 1
78
- else:
79
- pattern.failure_count += 1
80
- pattern.avg_cost = (
81
- pattern.avg_cost * (pattern.frequency - 1) + trace.total_cost_computed
82
- ) / pattern.frequency
83
- pattern.avg_latency_ms = (
84
- pattern.avg_latency_ms * (pattern.frequency - 1) + trace.total_latency_ms
85
- ) / pattern.frequency
86
- else:
87
- self.patterns[seq_hash] = WorkflowPattern(
88
- pattern_id=seq_hash,
89
- task_type=trace.task_type,
90
- tool_sequence=tool_sequence,
91
- frequency=1,
92
- success_count=1 if trace.final_outcome == Outcome.SUCCESS else 0,
93
- failure_count=0 if trace.final_outcome == Outcome.SUCCESS else 1,
94
- avg_cost=trace.total_cost_computed,
95
- avg_latency_ms=trace.total_latency_ms,
96
- )
97
-
98
- def extract_meta_tools(self) -> List[WorkflowPattern]:
99
- """Extract workflow patterns that meet meta-tool thresholds."""
100
- qualified = []
101
- for pattern in self.patterns.values():
102
- success_rate = pattern.success_count / max(pattern.frequency, 1)
103
- if (pattern.frequency >= self.config.meta_tool_min_frequency and
104
- success_rate >= self.config.meta_tool_min_success_rate):
105
-
106
- # Generate deterministic script if possible
107
- if self._is_deterministic_sequence(pattern.tool_sequence):
108
- pattern.is_deterministic = True
109
- pattern.compressed_script = self._generate_script(pattern)
110
-
111
- qualified.append(pattern)
112
-
113
- return sorted(qualified, key=lambda p: p.frequency * p.avg_cost, reverse=True)
114
-
115
- def _is_deterministic_sequence(self, tool_sequence: Tuple[str, ...]) -> bool:
116
- """Check if a tool sequence can be made deterministic."""
117
- # Simple heuristic: if all tools are read-only, it's likely deterministic
118
- read_only_tools = {"file_read", "search", "retrieve", "fetch", "calculator", "linter"}
119
- return all(t in read_only_tools for t in tool_sequence)
120
-
121
- def _generate_script(self, pattern: WorkflowPattern) -> str:
122
- """Generate a deterministic script for a workflow pattern."""
123
- lines = [f"# Meta-tool: {pattern.pattern_id}"]
124
- lines.append(f"# Task type: {pattern.task_type.value}")
125
- lines.append(f"# Success rate: {pattern.success_count / max(pattern.frequency, 1):.1%}")
126
- lines.append(f"# Avg cost: ${pattern.avg_cost:.4f}")
127
- lines.append("")
128
-
129
- for i, tool in enumerate(pattern.tool_sequence):
130
- lines.append(f"step_{i+1} = execute_tool('{tool}', params=auto_resolve())")
131
-
132
- lines.append("return aggregate_results()")
133
- return "\n".join(lines)
134
-
135
- def match_and_compress(
136
- self,
137
- task_type: TaskType,
138
- planned_tools: List[str],
139
- ) -> Optional[Dict]:
140
- """Match current plan against known meta-tools and return compressed plan if found."""
141
- planned_tuple = tuple(planned_tools)
142
-
143
- for pattern in self.patterns.values():
144
- if pattern.task_type != task_type:
145
- continue
146
- if pattern.compressed_script is None:
147
- continue
148
-
149
- # Check if planned tools are a subset or match of pattern
150
- if self._sequence_match(planned_tuple, pattern.tool_sequence):
151
- success_rate = pattern.success_count / max(pattern.frequency, 1)
152
- return {
153
- "use_meta_tool": True,
154
- "meta_tool_id": pattern.pattern_id,
155
- "compressed_script": pattern.compressed_script,
156
- "estimated_cost_savings": pattern.avg_cost * 0.3, # meta-tools save ~30%
157
- "estimated_latency_savings_ms": pattern.avg_latency_ms * 0.3,
158
- "success_rate": success_rate,
159
- "fallback_tools": list(pattern.tool_sequence),
160
- }
161
-
162
- return None
163
-
164
- def _sequence_match(self, planned: Tuple[str, ...], pattern: Tuple[str, ...]) -> bool:
165
- """Check if planned sequence matches or is contained in pattern."""
166
- if planned == pattern:
167
- return True
168
- # Allow prefix match if pattern is longer
169
- if len(pattern) >= len(planned) and pattern[:len(planned)] == planned:
170
- return True
171
- return False
172
-
173
- def get_stats(self) -> Dict:
174
- """Return meta-tool mining statistics."""
175
- total_patterns = len(self.patterns)
176
- qualified = self.extract_meta_tools()
177
- total_traces = len(self.trace_buffer)
178
-
179
- total_llm_calls_saved = sum(
180
- p.frequency * len(p.tool_sequence) * 0.5 # each meta-tool saves ~50% LLM calls
181
- for p in qualified
182
- )
183
-
184
- return {
185
- "total_patterns": total_patterns,
186
- "qualified_meta_tools": len(qualified),
187
- "total_traces_mined": total_traces,
188
- "estimated_llm_calls_saved": total_llm_calls_saved,
189
- "top_patterns": [
190
- {
191
- "pattern_id": p.pattern_id,
192
- "task_type": p.task_type.value,
193
- "tool_sequence": p.tool_sequence,
194
- "frequency": p.frequency,
195
- "success_rate": p.success_count / max(p.frequency, 1),
196
- "avg_cost": p.avg_cost,
197
- }
198
- for p in qualified[:5]
199
- ],
200
- }
 
1
+ """Meta-Tool Miner: Mine repeated successful traces into reusable workflows."""
2
+ from typing import Dict, List, Optional, Tuple
3
+ from dataclasses import dataclass
4
+ import json, hashlib
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
 
6
  @dataclass
7
+ class MacroTool:
8
+ name: str
9
+ steps: List[Dict] # sequence of tool calls
10
+ trigger_pattern: str
11
+ success_rate: float
 
 
12
  avg_cost: float
13
  avg_latency_ms: float
14
+ usage_count: int
15
+ last_used: str = ""
16
+
17
+ COMMON_WORKFLOWS = [
18
+ {"name":"search_inspect_patch","steps":["code_search","file_read","file_write"],
19
+ "trigger":"fix.*bug|implement.*feature|refactor"},
20
+ {"name":"research_answer_verify","steps":["web_search","summarize","verify"],
21
+ "trigger":"research|investigate|compare"},
22
+ {"name":"read_repair_retry","steps":["file_read","code_execute","file_read"],
23
+ "trigger":"debug|fix.*error|repair"},
24
+ {"name":"retrieve_extract_answer","steps":["web_search","file_read","summarize"],
25
+ "trigger":"find|look up|search.*for"},
26
+ {"name":"draft_review","steps":["file_read","summarize","verify"],
27
+ "trigger":"draft|review|check"},
28
+ ]
29
 
30
  class MetaToolMiner:
31
+ def __init__(self, min_occurrences: int = 3, min_success_rate: float = 0.7):
32
+ self.min_occurrences = min_occurrences
33
+ self.min_success_rate = min_success_rate
34
+ self.discovered_macros: Dict[str, MacroTool] = {}
35
+ self._trace_patterns: Dict[str, List[Dict]] = {}
36
+ # Pre-load common workflows
37
+ for wf in COMMON_WORKFLOWS:
38
+ self.discovered_macros[wf["name"]] = MacroTool(
39
+ name=wf["name"], steps=[{"tool":s} for s in wf["steps"]],
40
+ trigger_pattern=wf["trigger"], success_rate=0.8,
41
+ avg_cost=0.05, avg_latency_ms=5000, usage_count=0)
42
+
43
+ def mine_traces(self, traces: List[Dict]) -> List[MacroTool]:
44
+ for trace in traces:
45
+ if not trace.get("task_success", False): continue
46
+ tool_seq = []
47
+ for step in trace.get("steps", []):
48
+ for tc in step.get("tool_calls", []):
49
+ tool_seq.append(tc.get("tool_name","unknown"))
50
+ if len(tool_seq) >= 2:
51
+ pattern_key = "->".join(tool_seq)
52
+ if pattern_key not in self._trace_patterns:
53
+ self._trace_patterns[pattern_key] = []
54
+ self._trace_patterns[pattern_key].append(trace)
55
+ new_macros = []
56
+ for pattern, pattern_traces in self._trace_patterns.items():
57
+ if len(pattern_traces) < self.min_occurrences: continue
58
+ success_rate = sum(1 for t in pattern_traces if t.get("task_success",False)) / len(pattern_traces)
59
+ if success_rate < self.min_success_rate: continue
60
+ name = f"macro_{hashlib.md5(pattern.encode()).hexdigest()[:8]}"
61
+ if name in self.discovered_macros: continue
62
+ macro = MacroTool(
63
+ name=name,
64
+ steps=[{"tool":t} for t in pattern.split("->")],
65
+ trigger_pattern="",
66
+ success_rate=success_rate,
67
+ avg_cost=sum(t.get("total_cost",0) for t in pattern_traces) / len(pattern_traces),
68
+ avg_latency_ms=sum(t.get("latency_total_ms",0) for t in pattern_traces) / len(pattern_traces),
69
+ usage_count=len(pattern_traces))
70
+ self.discovered_macros[name] = macro
71
+ new_macros.append(macro)
72
+ return new_macros
73
+
74
+ def match_macro(self, request: str, task_type: str) -> Optional[MacroTool]:
75
+ import re
76
+ r = request.lower()
77
+ best = None
78
+ best_score = 0
79
+ for macro in self.discovered_macros.values():
80
+ if macro.trigger_pattern:
81
+ if re.search(macro.trigger_pattern, r):
82
+ score = macro.success_rate * len(macro.steps)
83
+ if score > best_score:
84
+ best_score = score
85
+ best = macro
86
+ return best