agent-cost-optimizer / tests /test_integration.py
narcolepticchicken's picture
Upload tests/test_integration.py
5c39bf9 verified
"""End-to-end integration test for ACO.
Demonstrates the full pipeline:
1. Classify task type and difficulty
2. Route to appropriate model tier
3. Budget context
4. Decide tool usage
5. Decide verification
6. Handle failure/retry
7. Detect doom
8. Collect telemetry
"""
import json, sys, os
# Add parent to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from aco.classifier import TaskCostClassifier
from aco.context_budgeter import ContextBudgeter
from aco.tool_gate import ToolUseGate
from aco.verifier_budgeter import VerifierBudgeter
from aco.retry_optimizer import RetryOptimizer
from aco.doom_detector import DoomDetector
from aco.telemetry import TelemetryCollector
from aco.cache_layout import CacheAwareLayout
from aco.meta_tool_miner import MetaToolMiner
from aco.trace_schema import AgentTrace, TraceStep, ModelCall, ToolCall
from aco.conformal import ConformalEscalationCalibrator
from aco.pareto import build_frontier_report
def test_full_pipeline():
"""Run the full ACO pipeline on sample tasks."""
print("="*60)
print("ACO END-TO-END INTEGRATION TEST")
print("="*60)
# Initialize all modules
classifier = TaskCostClassifier()
budgeter = ContextBudgeter()
tool_gate = ToolUseGate()
verifier = VerifierBudgeter()
retry = RetryOptimizer()
doom = DoomDetector()
telemetry = TelemetryCollector()
cache = CacheAwareLayout()
miner = MetaToolMiner()
conformal = ConformalEscalationCalibrator(alpha=0.05)
# Sample tasks
tasks = [
"Fix the typo in the README",
"Implement user authentication with JWT tokens",
"Research the latest advances in transformer architectures",
"Review this contract for GDPR compliance issues",
"Deploy the new microservice to production",
"Write a Python function to sort a list",
]
results = []
for task in tasks:
print(f"\n--- Task: {task[:50]}... ---")
# 1. Classify
cls = classifier.classify(task)
print(f" Type: {cls['task_type']}, Difficulty: {cls['difficulty']}")
print(f" Risk: {cls['risk_level']}, Needs retrieval: {cls['needs_retrieval']}")
print(f" Needs verifier: {cls['needs_verifier']}, Est cost: ${cls['estimated_cost']:.2f}")
# 2. Route (using classifier's recommended tier)
tier = cls['recommended_tier']
print(f" Routed to tier: {tier}")
# 3. Conformal check
psuccess = 0.7 + 0.05 * tier # placeholder
should_esc = conformal.should_escalate(tier, psuccess)
if should_esc:
tier = min(tier + 1, 5)
print(f" Conformal escalation → tier {tier}")
# 4. Budget context
ctx = budgeter.budget(task, cls['task_type'], tier)
print(f" Context: {ctx.total_tokens} tokens ({len(ctx.keep)} keep, {len(ctx.summarize)} summarize)")
# 5. Cache layout
prompt = cache.layout(task, ctx)
print(f" Cache: {prompt.prefix_tokens} prefix (cacheable), {prompt.suffix_tokens} suffix (dynamic)")
# 6. Tool gate
tools = tool_gate.evaluate(task, cls['task_type'], tier)
print(f" Tools: {len(tools.recommended)} recommended, {len(tools.skipped)} skipped, batch={tools.should_batch}")
# 7. Verifier
v = verifier.should_verify(task, cls['task_type'], tier, psuccess)
print(f" Verifier: {v.should_verify} (risk={v.risk_level}, reason={v.reason})")
# 8. Collect telemetry
step = TraceStep(
step_type="route",
model_call=ModelCall(tier=tier, psuccess=psuccess),
tool_calls=[ToolCall(name=t) for t in tools.recommended[:3]],
)
telemetry.record(task, cls, step)
results.append({
"task": task,
"type": cls['task_type'],
"difficulty": cls['difficulty'],
"tier": tier,
"context_tokens": ctx.total_tokens,
"cache_prefix": prompt.prefix_tokens,
"tools_recommended": len(tools.recommended),
"verifier_needed": v.should_verify,
})
# 9. Test doom detector
print(f"\n--- Doom Detector Test ---")
from aco.doom_detector import RunState
state = RunState(
failed_tool_calls=4,
total_cost=2.50,
artifacts_created=0,
retries=3,
context_tokens=80000,
)
doom_result = doom.check(state)
print(f" State: {state.failed_tool_calls} failed tools, {state.total_cost} cost, {state.artifacts_created} artifacts")
print(f" Doom: {doom_result.is_doomed}, Score: {doom_result.doom_score:.2f}")
print(f" Action: {doom_result.recommended_action}")
# 10. Test retry optimizer
print(f"\n--- Retry Optimizer Test ---")
recovery = retry.recommend("tool_call_failed", "SyntaxError in generated code")
print(f" Failure: tool_call_failed")
print(f" Recovery: {recovery.action} (strategy={recovery.strategy})")
recovery2 = retry.recommend("model_overloaded", "Rate limit exceeded")
print(f" Failure: model_overloaded")
print(f" Recovery: {recovery2.action}")
# 11. Test meta-tool miner
print(f"\n--- Meta-Tool Miner Test ---")
from aco.meta_tool_miner import TracePattern
pattern = TracePattern(
name="search_read_patch_test",
steps=["search", "read_file", "edit_file", "run_test"],
frequency=47,
success_rate=0.82,
)
macro = miner.compress(pattern)
print(f" Pattern: {pattern.name} (freq={pattern.frequency}, sr={pattern.success_rate})")
print(f" Macro: {macro.name}, steps={macro.steps}, savings={macro.estimated_savings:.1f} LLM calls")
# 12. Test Pareto frontier
print(f"\n--- Pareto Frontier Test ---")
policies = {
"oracle": (0.870, 0.062),
"v10_feedback": (0.848, 0.201),
"frontier": (0.782, 0.317),
"v10_direct": (0.766, 0.188),
"always_cheap": (0.632, 0.014),
}
report = build_frontier_report(policies)
print(f" AIQ: {report['aiq']}")
for level, cost in report['frontier_quality_levels'].items():
print(f" {level}: ${cost:.3f}")
# Summary
print(f"\n{'='*60}")
print("INTEGRATION TEST COMPLETE")
print(f"{'='*60}")
print(f"Processed {len(results)} tasks")
tier_dist = {}
for r in results:
tier_dist[r['tier']] = tier_dist.get(r['tier'], 0) + 1
print(f"Tier distribution: {tier_dist}")
verified = sum(1 for r in results if r['verifier_needed'])
print(f"Verifier calls: {verified}/{len(results)} ({verified/len(results)*100:.0f}%)")
avg_ctx = sum(r['context_tokens'] for r in results) / len(results)
print(f"Average context: {avg_ctx:.0f} tokens")
return results
if __name__ == "__main__":
test_full_pipeline()