narcolepticchicken commited on
Commit
33d6b64
·
verified ·
1 Parent(s): 2551fef

Upload aco/meta_tool_miner.py

Browse files
Files changed (1) hide show
  1. aco/meta_tool_miner.py +200 -0
aco/meta_tool_miner.py ADDED
@@ -0,0 +1,200 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Meta-Tool / Workflow Compression - Module 9.
2
+
3
+ Mines repeated successful traces into reusable workflows.
4
+
5
+ Compresses recurring workflows into:
6
+ - deterministic scripts
7
+ - macro tools
8
+ - reusable skills
9
+ - cached plans
10
+
11
+ Metrics:
12
+ - LLM calls saved
13
+ - workflow success
14
+ - bad automation rate
15
+ - regression rate
16
+ - latency saved
17
+ """
18
+
19
+ import hashlib
20
+ import json
21
+ from typing import Dict, List, Tuple, Optional, Set
22
+ from dataclasses import dataclass, field
23
+ from collections import defaultdict
24
+
25
+ from .trace_schema import AgentTrace, TraceStep, Outcome, TaskType
26
+ from .config import ACOConfig
27
+
28
+
29
+ @dataclass
30
+ class WorkflowPattern:
31
+ pattern_id: str
32
+ task_type: TaskType
33
+ tool_sequence: Tuple[str, ...]
34
+ frequency: int
35
+ success_count: int
36
+ failure_count: int
37
+ avg_cost: float
38
+ avg_latency_ms: float
39
+ compressed_script: Optional[str] = None
40
+ is_deterministic: bool = False
41
+
42
+
43
+ class MetaToolMiner:
44
+ """Discovers and compresses repeated agent workflows into meta-tools."""
45
+
46
+ def __init__(self, config: Optional[ACOConfig] = None):
47
+ self.config = config or ACOConfig()
48
+ self.patterns: Dict[str, WorkflowPattern] = {}
49
+ self.meta_tools: Dict[str, Dict] = {} # pattern_id -> meta_tool definition
50
+ self.trace_buffer: List[AgentTrace] = []
51
+
52
+ def ingest_trace(self, trace: AgentTrace) -> None:
53
+ """Add a completed trace for pattern mining."""
54
+ if trace.final_outcome not in (Outcome.SUCCESS, Outcome.PARTIAL_SUCCESS):
55
+ return # Only mine successful workflows
56
+
57
+ self.trace_buffer.append(trace)
58
+
59
+ # Extract tool sequences
60
+ tool_sequence = tuple(
61
+ tc.tool_name
62
+ for step in trace.steps
63
+ for tc in step.tool_calls
64
+ )
65
+
66
+ if len(tool_sequence) < 2:
67
+ return
68
+
69
+ # Hash the sequence
70
+ seq_hash = hashlib.md5(json.dumps(tool_sequence).encode()).hexdigest()[:16]
71
+
72
+ # Update or create pattern
73
+ if seq_hash in self.patterns:
74
+ pattern = self.patterns[seq_hash]
75
+ pattern.frequency += 1
76
+ if trace.final_outcome == Outcome.SUCCESS:
77
+ pattern.success_count += 1
78
+ else:
79
+ pattern.failure_count += 1
80
+ pattern.avg_cost = (
81
+ pattern.avg_cost * (pattern.frequency - 1) + trace.total_cost_computed
82
+ ) / pattern.frequency
83
+ pattern.avg_latency_ms = (
84
+ pattern.avg_latency_ms * (pattern.frequency - 1) + trace.total_latency_ms
85
+ ) / pattern.frequency
86
+ else:
87
+ self.patterns[seq_hash] = WorkflowPattern(
88
+ pattern_id=seq_hash,
89
+ task_type=trace.task_type,
90
+ tool_sequence=tool_sequence,
91
+ frequency=1,
92
+ success_count=1 if trace.final_outcome == Outcome.SUCCESS else 0,
93
+ failure_count=0 if trace.final_outcome == Outcome.SUCCESS else 1,
94
+ avg_cost=trace.total_cost_computed,
95
+ avg_latency_ms=trace.total_latency_ms,
96
+ )
97
+
98
+ def extract_meta_tools(self) -> List[WorkflowPattern]:
99
+ """Extract workflow patterns that meet meta-tool thresholds."""
100
+ qualified = []
101
+ for pattern in self.patterns.values():
102
+ success_rate = pattern.success_count / max(pattern.frequency, 1)
103
+ if (pattern.frequency >= self.config.meta_tool_min_frequency and
104
+ success_rate >= self.config.meta_tool_min_success_rate):
105
+
106
+ # Generate deterministic script if possible
107
+ if self._is_deterministic_sequence(pattern.tool_sequence):
108
+ pattern.is_deterministic = True
109
+ pattern.compressed_script = self._generate_script(pattern)
110
+
111
+ qualified.append(pattern)
112
+
113
+ return sorted(qualified, key=lambda p: p.frequency * p.avg_cost, reverse=True)
114
+
115
+ def _is_deterministic_sequence(self, tool_sequence: Tuple[str, ...]) -> bool:
116
+ """Check if a tool sequence can be made deterministic."""
117
+ # Simple heuristic: if all tools are read-only, it's likely deterministic
118
+ read_only_tools = {"file_read", "search", "retrieve", "fetch", "calculator", "linter"}
119
+ return all(t in read_only_tools for t in tool_sequence)
120
+
121
+ def _generate_script(self, pattern: WorkflowPattern) -> str:
122
+ """Generate a deterministic script for a workflow pattern."""
123
+ lines = [f"# Meta-tool: {pattern.pattern_id}"]
124
+ lines.append(f"# Task type: {pattern.task_type.value}")
125
+ lines.append(f"# Success rate: {pattern.success_count / max(pattern.frequency, 1):.1%}")
126
+ lines.append(f"# Avg cost: ${pattern.avg_cost:.4f}")
127
+ lines.append("")
128
+
129
+ for i, tool in enumerate(pattern.tool_sequence):
130
+ lines.append(f"step_{i+1} = execute_tool('{tool}', params=auto_resolve())")
131
+
132
+ lines.append("return aggregate_results()")
133
+ return "\n".join(lines)
134
+
135
+ def match_and_compress(
136
+ self,
137
+ task_type: TaskType,
138
+ planned_tools: List[str],
139
+ ) -> Optional[Dict]:
140
+ """Match current plan against known meta-tools and return compressed plan if found."""
141
+ planned_tuple = tuple(planned_tools)
142
+
143
+ for pattern in self.patterns.values():
144
+ if pattern.task_type != task_type:
145
+ continue
146
+ if pattern.compressed_script is None:
147
+ continue
148
+
149
+ # Check if planned tools are a subset or match of pattern
150
+ if self._sequence_match(planned_tuple, pattern.tool_sequence):
151
+ success_rate = pattern.success_count / max(pattern.frequency, 1)
152
+ return {
153
+ "use_meta_tool": True,
154
+ "meta_tool_id": pattern.pattern_id,
155
+ "compressed_script": pattern.compressed_script,
156
+ "estimated_cost_savings": pattern.avg_cost * 0.3, # meta-tools save ~30%
157
+ "estimated_latency_savings_ms": pattern.avg_latency_ms * 0.3,
158
+ "success_rate": success_rate,
159
+ "fallback_tools": list(pattern.tool_sequence),
160
+ }
161
+
162
+ return None
163
+
164
+ def _sequence_match(self, planned: Tuple[str, ...], pattern: Tuple[str, ...]) -> bool:
165
+ """Check if planned sequence matches or is contained in pattern."""
166
+ if planned == pattern:
167
+ return True
168
+ # Allow prefix match if pattern is longer
169
+ if len(pattern) >= len(planned) and pattern[:len(planned)] == planned:
170
+ return True
171
+ return False
172
+
173
+ def get_stats(self) -> Dict:
174
+ """Return meta-tool mining statistics."""
175
+ total_patterns = len(self.patterns)
176
+ qualified = self.extract_meta_tools()
177
+ total_traces = len(self.trace_buffer)
178
+
179
+ total_llm_calls_saved = sum(
180
+ p.frequency * len(p.tool_sequence) * 0.5 # each meta-tool saves ~50% LLM calls
181
+ for p in qualified
182
+ )
183
+
184
+ return {
185
+ "total_patterns": total_patterns,
186
+ "qualified_meta_tools": len(qualified),
187
+ "total_traces_mined": total_traces,
188
+ "estimated_llm_calls_saved": total_llm_calls_saved,
189
+ "top_patterns": [
190
+ {
191
+ "pattern_id": p.pattern_id,
192
+ "task_type": p.task_type.value,
193
+ "tool_sequence": p.tool_sequence,
194
+ "frequency": p.frequency,
195
+ "success_rate": p.success_count / max(p.frequency, 1),
196
+ "avg_cost": p.avg_cost,
197
+ }
198
+ for p in qualified[:5]
199
+ ],
200
+ }