""" Observability — Callback hooks, cost tracking, and span tracing. Provides production-grade visibility into agent runs: - Token & cost tracking per step, per task, per agent - Callback hooks for custom integrations (LangSmith, Arize, custom dashboards) - Structured event logging - Performance profiling Lightweight — no external dependencies. Integrates with OpenTelemetry-compatible systems via the callback interface. """ from __future__ import annotations import json import logging import time from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable, Protocol logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Cost Tracking # --------------------------------------------------------------------------- # Approximate cost per 1M tokens (input) for common models MODEL_COSTS_PER_1M_INPUT = { # Cloud LLMs "gpt-4o": 2.50, "gpt-4o-mini": 0.15, "claude-3-5-sonnet": 3.00, "claude-3-5-haiku": 0.80, # Cloud SLMs via inference providers "qwen/qwen3-32b": 0.20, "qwen/qwen3-8b": 0.05, "meta-llama/llama-3.1-8b-instruct": 0.05, # Local models (electricity cost estimate per 1M tokens) "local-gpu": 0.01, # ~$0.01/1M tokens on consumer GPU "local-cpu": 0.005, # ~$0.005/1M tokens on CPU "ollama": 0.005, # Estimate for local Ollama } @dataclass class TokenUsage: """Token usage for a single LLM call.""" prompt_tokens: int = 0 completion_tokens: int = 0 total_tokens: int = 0 model: str = "" estimated_cost_usd: float = 0.0 timestamp: float = field(default_factory=time.time) @dataclass class CostTracker: """ Tracks token usage and estimated costs across all LLM calls. Usage: tracker = CostTracker(model_name="qwen3:1.7b", cost_per_1m=0.005) tracker.record(prompt_tokens=500, completion_tokens=200) print(tracker.summary()) """ model_name: str = "unknown" cost_per_1m_input: float = 0.01 cost_per_1m_output: float = 0.02 calls: list[TokenUsage] = field(default_factory=list) def record( self, prompt_tokens: int = 0, completion_tokens: int = 0, model: str | None = None, ) -> TokenUsage: """Record a single LLM call.""" total = prompt_tokens + completion_tokens cost = ( prompt_tokens * self.cost_per_1m_input / 1_000_000 + completion_tokens * self.cost_per_1m_output / 1_000_000 ) usage = TokenUsage( prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=total, model=model or self.model_name, estimated_cost_usd=cost, ) self.calls.append(usage) return usage @property def total_tokens(self) -> int: return sum(c.total_tokens for c in self.calls) @property def total_cost_usd(self) -> float: return sum(c.estimated_cost_usd for c in self.calls) @property def total_calls(self) -> int: return len(self.calls) def summary(self) -> dict[str, Any]: return { "model": self.model_name, "total_calls": self.total_calls, "total_tokens": self.total_tokens, "prompt_tokens": sum(c.prompt_tokens for c in self.calls), "completion_tokens": sum(c.completion_tokens for c in self.calls), "estimated_cost_usd": round(self.total_cost_usd, 6), } def reset(self): self.calls.clear() # --------------------------------------------------------------------------- # Callback System # --------------------------------------------------------------------------- class EventType(Enum): """Events emitted during agent execution.""" TASK_START = "task_start" TASK_END = "task_end" STEP_START = "step_start" STEP_END = "step_end" ACTION_DECIDED = "action_decided" TOOL_CALLED = "tool_called" TOOL_RESULT = "tool_result" STATE_EVALUATED = "state_evaluated" LLM_CALL_START = "llm_call_start" LLM_CALL_END = "llm_call_end" HEURISTIC_LEARNED = "heuristic_learned" MEMORY_UPDATED = "memory_updated" OPTIMIZATION_START = "optimization_start" OPTIMIZATION_END = "optimization_end" ERROR = "error" CHECKPOINT = "checkpoint" HUMAN_INPUT_REQUESTED = "human_input_requested" HUMAN_INPUT_RECEIVED = "human_input_received" @dataclass class AgentEvent: """A structured event emitted during agent execution.""" event_type: EventType data: dict[str, Any] = field(default_factory=dict) step: int = 0 task_id: str = "" agent_id: str = "" timestamp: float = field(default_factory=time.time) duration_s: float = 0.0 def to_dict(self) -> dict[str, Any]: return { "event": self.event_type.value, "data": self.data, "step": self.step, "task_id": self.task_id, "agent_id": self.agent_id, "timestamp": self.timestamp, "duration_s": self.duration_s, } def to_json(self) -> str: return json.dumps(self.to_dict(), default=str) class AgentCallback(Protocol): """Protocol for agent callbacks. Implement this to integrate with external systems.""" def on_event(self, event: AgentEvent) -> None: """Called when an event occurs during agent execution.""" ... class LoggingCallback: """Simple callback that logs all events.""" def __init__(self, level: int = logging.INFO): self.level = level self.events: list[AgentEvent] = [] def on_event(self, event: AgentEvent) -> None: self.events.append(event) logger.log( self.level, f"[{event.event_type.value}] step={event.step} " f"task={event.task_id} {json.dumps(event.data, default=str)[:200]}", ) class JSONFileCallback: """Callback that writes events to a JSON Lines file.""" def __init__(self, path: str): self.path = path def on_event(self, event: AgentEvent) -> None: with open(self.path, "a") as f: f.write(event.to_json() + "\n") class MetricsCollector: """ Callback that collects aggregate metrics for analysis. Usage: collector = MetricsCollector() # ... run tasks with collector as callback ... print(collector.summary()) """ def __init__(self): self.tasks: list[dict] = [] self.steps: list[dict] = [] self.llm_calls: list[dict] = [] self.errors: list[dict] = [] self._current_task: dict = {} self._step_start: float = 0 def on_event(self, event: AgentEvent) -> None: if event.event_type == EventType.TASK_START: self._current_task = {"task_id": event.task_id, "start": event.timestamp, "steps": 0} elif event.event_type == EventType.TASK_END: self._current_task["end"] = event.timestamp self._current_task["duration_s"] = event.timestamp - self._current_task.get("start", event.timestamp) self._current_task.update(event.data) self.tasks.append(self._current_task) elif event.event_type == EventType.STEP_START: self._step_start = event.timestamp if self._current_task: self._current_task["steps"] = self._current_task.get("steps", 0) + 1 elif event.event_type == EventType.STATE_EVALUATED: self.steps.append({ "step": event.step, "task_id": event.task_id, "duration_s": event.timestamp - self._step_start if self._step_start else 0, **event.data, }) elif event.event_type == EventType.LLM_CALL_END: self.llm_calls.append(event.data) elif event.event_type == EventType.ERROR: self.errors.append({"step": event.step, **event.data}) def summary(self) -> dict[str, Any]: if not self.tasks: return {"tasks": 0} success_count = sum(1 for t in self.tasks if t.get("success_rate", 0) > 0.5) total_steps = sum(t.get("steps", 0) for t in self.tasks) total_duration = sum(t.get("duration_s", 0) for t in self.tasks) avg_phi_deltas = [s.get("delta", 0) for s in self.steps if "delta" in s] return { "total_tasks": len(self.tasks), "successful_tasks": success_count, "success_rate": success_count / len(self.tasks) if self.tasks else 0, "total_steps": total_steps, "avg_steps_per_task": total_steps / len(self.tasks), "total_duration_s": round(total_duration, 2), "avg_duration_per_task_s": round(total_duration / len(self.tasks), 2), "total_llm_calls": len(self.llm_calls), "total_errors": len(self.errors), "avg_phi_delta": round(sum(avg_phi_deltas) / len(avg_phi_deltas), 3) if avg_phi_deltas else 0, } # --------------------------------------------------------------------------- # Callback Manager — dispatches events to multiple callbacks # --------------------------------------------------------------------------- class CallbackManager: """ Manages multiple callbacks and dispatches events to all of them. Usage: mgr = CallbackManager() mgr.add(LoggingCallback()) mgr.add(MetricsCollector()) mgr.add(JSONFileCallback("events.jsonl")) mgr.emit(AgentEvent(EventType.TASK_START, data={"purpose": "..."})) """ def __init__(self, callbacks: list | None = None): self.callbacks: list = callbacks or [] def add(self, callback) -> "CallbackManager": self.callbacks.append(callback) return self def emit(self, event: AgentEvent) -> None: for cb in self.callbacks: try: cb.on_event(event) except Exception as e: logger.warning(f"Callback {type(cb).__name__} failed: {e}")