| """Cost Telemetry Collector - Module 1.""" |
|
|
| import json |
| import os |
| from typing import List, Dict, Any, Optional |
| from datetime import datetime |
| from pathlib import Path |
|
|
| from .trace_schema import AgentTrace, TraceStep, ModelCall, ToolCall, VerifierCall, TaskType, Outcome, FailureTag |
|
|
|
|
| class CostTelemetryCollector: |
| """Collects structured telemetry from agent runs and persists normalized traces.""" |
|
|
| def __init__(self, storage_path: str = "./traces"): |
| self.storage_path = Path(storage_path) |
| self.storage_path.mkdir(parents=True, exist_ok=True) |
| self._pending: Dict[str, AgentTrace] = {} |
|
|
| def start_trace(self, trace_id: str, user_request: str, task_type: TaskType) -> AgentTrace: |
| trace = AgentTrace( |
| trace_id=trace_id, |
| user_request=user_request, |
| task_type=task_type, |
| ) |
| self._pending[trace_id] = trace |
| return trace |
|
|
| def add_step( |
| self, |
| trace_id: str, |
| step_id: str, |
| model_call: ModelCall, |
| tool_calls: Optional[List[ToolCall]] = None, |
| verifier_calls: Optional[List[VerifierCall]] = None, |
| context_size_tokens: int = 0, |
| context_sources: Optional[List[str]] = None, |
| retry_count: int = 0, |
| recovery_action: Optional[str] = None, |
| artifacts_created: Optional[List[str]] = None, |
| step_outcome: Optional[Outcome] = None, |
| ) -> None: |
| trace = self._pending.get(trace_id) |
| if not trace: |
| raise ValueError(f"Trace {trace_id} not found") |
| |
| step = TraceStep( |
| step_id=step_id, |
| timestamp=datetime.utcnow(), |
| task_type=trace.task_type, |
| model_call=model_call, |
| tool_calls=tool_calls or [], |
| verifier_calls=verifier_calls or [], |
| context_size_tokens=context_size_tokens, |
| context_sources=context_sources or [], |
| retry_count=retry_count, |
| recovery_action=recovery_action, |
| artifacts_created=artifacts_created or [], |
| step_outcome=step_outcome, |
| ) |
| trace.steps.append(step) |
|
|
| def finalize_trace( |
| self, |
| trace_id: str, |
| final_outcome: Outcome, |
| failure_tags: Optional[List[FailureTag]] = None, |
| user_satisfaction: Optional[float] = None, |
| total_cost_saved_vs_frontier: Optional[float] = None, |
| optimal_cost: Optional[float] = None, |
| metadata: Optional[Dict[str, Any]] = None, |
| ) -> AgentTrace: |
| trace = self._pending.pop(trace_id) |
| trace.final_outcome = final_outcome |
| trace.failure_tags = failure_tags or [] |
| trace.user_satisfaction = user_satisfaction |
| trace.total_cost_saved_vs_frontier = total_cost_saved_vs_frontier |
| trace.optimal_cost = optimal_cost |
| trace.metadata = metadata or {} |
| trace.total_cost = trace.total_cost_computed |
| |
| self._persist(trace) |
| return trace |
|
|
| def _persist(self, trace: AgentTrace) -> None: |
| filepath = self.storage_path / f"{trace.trace_id}.json" |
| with open(filepath, "w") as f: |
| json.dump(trace.to_dict(), f, indent=2, default=str) |
|
|
| def load_trace(self, trace_id: str) -> Optional[AgentTrace]: |
| filepath = self.storage_path / f"{trace_id}.json" |
| if not filepath.exists(): |
| return None |
| with open(filepath, "r") as f: |
| data = json.load(f) |
| |
| return data |
|
|
| def list_traces(self) -> List[str]: |
| return [p.stem for p in self.storage_path.glob("*.json")] |
|
|
| def get_stats(self) -> Dict[str, Any]: |
| traces = [] |
| for tid in self.list_traces(): |
| t = self.load_trace(tid) |
| if t: |
| traces.append(t) |
| |
| if not traces: |
| return {"count": 0} |
| |
| total_cost = sum(t.get("total_cost", 0) for t in traces if isinstance(t, dict)) |
| total_steps = sum(len(t.get("steps", [])) for t in traces if isinstance(t, dict)) |
| |
| return { |
| "count": len(traces), |
| "avg_cost": total_cost / len(traces), |
| "avg_steps": total_steps / len(traces), |
| "success_rate": sum( |
| 1 for t in traces |
| if isinstance(t, dict) and t.get("final_outcome") == "success" |
| ) / len(traces), |
| } |
|
|