purpose-agent / purpose_agent /observability.py
Rohan03's picture
v0.2.0: Add purpose_agent/observability.py
f9d84be verified
raw
history blame
10.1 kB
"""
Observability — Callback hooks, cost tracking, and span tracing.
Provides production-grade visibility into agent runs:
- Token & cost tracking per step, per task, per agent
- Callback hooks for custom integrations (LangSmith, Arize, custom dashboards)
- Structured event logging
- Performance profiling
Lightweight — no external dependencies. Integrates with OpenTelemetry-compatible
systems via the callback interface.
"""
from __future__ import annotations
import json
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Protocol
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cost Tracking
# ---------------------------------------------------------------------------
# Approximate cost per 1M tokens (input) for common models
MODEL_COSTS_PER_1M_INPUT = {
# Cloud LLMs
"gpt-4o": 2.50,
"gpt-4o-mini": 0.15,
"claude-3-5-sonnet": 3.00,
"claude-3-5-haiku": 0.80,
# Cloud SLMs via inference providers
"qwen/qwen3-32b": 0.20,
"qwen/qwen3-8b": 0.05,
"meta-llama/llama-3.1-8b-instruct": 0.05,
# Local models (electricity cost estimate per 1M tokens)
"local-gpu": 0.01, # ~$0.01/1M tokens on consumer GPU
"local-cpu": 0.005, # ~$0.005/1M tokens on CPU
"ollama": 0.005, # Estimate for local Ollama
}
@dataclass
class TokenUsage:
"""Token usage for a single LLM call."""
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
model: str = ""
estimated_cost_usd: float = 0.0
timestamp: float = field(default_factory=time.time)
@dataclass
class CostTracker:
"""
Tracks token usage and estimated costs across all LLM calls.
Usage:
tracker = CostTracker(model_name="qwen3:1.7b", cost_per_1m=0.005)
tracker.record(prompt_tokens=500, completion_tokens=200)
print(tracker.summary())
"""
model_name: str = "unknown"
cost_per_1m_input: float = 0.01
cost_per_1m_output: float = 0.02
calls: list[TokenUsage] = field(default_factory=list)
def record(
self,
prompt_tokens: int = 0,
completion_tokens: int = 0,
model: str | None = None,
) -> TokenUsage:
"""Record a single LLM call."""
total = prompt_tokens + completion_tokens
cost = (
prompt_tokens * self.cost_per_1m_input / 1_000_000
+ completion_tokens * self.cost_per_1m_output / 1_000_000
)
usage = TokenUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total,
model=model or self.model_name,
estimated_cost_usd=cost,
)
self.calls.append(usage)
return usage
@property
def total_tokens(self) -> int:
return sum(c.total_tokens for c in self.calls)
@property
def total_cost_usd(self) -> float:
return sum(c.estimated_cost_usd for c in self.calls)
@property
def total_calls(self) -> int:
return len(self.calls)
def summary(self) -> dict[str, Any]:
return {
"model": self.model_name,
"total_calls": self.total_calls,
"total_tokens": self.total_tokens,
"prompt_tokens": sum(c.prompt_tokens for c in self.calls),
"completion_tokens": sum(c.completion_tokens for c in self.calls),
"estimated_cost_usd": round(self.total_cost_usd, 6),
}
def reset(self):
self.calls.clear()
# ---------------------------------------------------------------------------
# Callback System
# ---------------------------------------------------------------------------
class EventType(Enum):
"""Events emitted during agent execution."""
TASK_START = "task_start"
TASK_END = "task_end"
STEP_START = "step_start"
STEP_END = "step_end"
ACTION_DECIDED = "action_decided"
TOOL_CALLED = "tool_called"
TOOL_RESULT = "tool_result"
STATE_EVALUATED = "state_evaluated"
LLM_CALL_START = "llm_call_start"
LLM_CALL_END = "llm_call_end"
HEURISTIC_LEARNED = "heuristic_learned"
MEMORY_UPDATED = "memory_updated"
OPTIMIZATION_START = "optimization_start"
OPTIMIZATION_END = "optimization_end"
ERROR = "error"
CHECKPOINT = "checkpoint"
HUMAN_INPUT_REQUESTED = "human_input_requested"
HUMAN_INPUT_RECEIVED = "human_input_received"
@dataclass
class AgentEvent:
"""A structured event emitted during agent execution."""
event_type: EventType
data: dict[str, Any] = field(default_factory=dict)
step: int = 0
task_id: str = ""
agent_id: str = ""
timestamp: float = field(default_factory=time.time)
duration_s: float = 0.0
def to_dict(self) -> dict[str, Any]:
return {
"event": self.event_type.value,
"data": self.data,
"step": self.step,
"task_id": self.task_id,
"agent_id": self.agent_id,
"timestamp": self.timestamp,
"duration_s": self.duration_s,
}
def to_json(self) -> str:
return json.dumps(self.to_dict(), default=str)
class AgentCallback(Protocol):
"""Protocol for agent callbacks. Implement this to integrate with external systems."""
def on_event(self, event: AgentEvent) -> None:
"""Called when an event occurs during agent execution."""
...
class LoggingCallback:
"""Simple callback that logs all events."""
def __init__(self, level: int = logging.INFO):
self.level = level
self.events: list[AgentEvent] = []
def on_event(self, event: AgentEvent) -> None:
self.events.append(event)
logger.log(
self.level,
f"[{event.event_type.value}] step={event.step} "
f"task={event.task_id} {json.dumps(event.data, default=str)[:200]}",
)
class JSONFileCallback:
"""Callback that writes events to a JSON Lines file."""
def __init__(self, path: str):
self.path = path
def on_event(self, event: AgentEvent) -> None:
with open(self.path, "a") as f:
f.write(event.to_json() + "\n")
class MetricsCollector:
"""
Callback that collects aggregate metrics for analysis.
Usage:
collector = MetricsCollector()
# ... run tasks with collector as callback ...
print(collector.summary())
"""
def __init__(self):
self.tasks: list[dict] = []
self.steps: list[dict] = []
self.llm_calls: list[dict] = []
self.errors: list[dict] = []
self._current_task: dict = {}
self._step_start: float = 0
def on_event(self, event: AgentEvent) -> None:
if event.event_type == EventType.TASK_START:
self._current_task = {"task_id": event.task_id, "start": event.timestamp, "steps": 0}
elif event.event_type == EventType.TASK_END:
self._current_task["end"] = event.timestamp
self._current_task["duration_s"] = event.timestamp - self._current_task.get("start", event.timestamp)
self._current_task.update(event.data)
self.tasks.append(self._current_task)
elif event.event_type == EventType.STEP_START:
self._step_start = event.timestamp
if self._current_task:
self._current_task["steps"] = self._current_task.get("steps", 0) + 1
elif event.event_type == EventType.STATE_EVALUATED:
self.steps.append({
"step": event.step,
"task_id": event.task_id,
"duration_s": event.timestamp - self._step_start if self._step_start else 0,
**event.data,
})
elif event.event_type == EventType.LLM_CALL_END:
self.llm_calls.append(event.data)
elif event.event_type == EventType.ERROR:
self.errors.append({"step": event.step, **event.data})
def summary(self) -> dict[str, Any]:
if not self.tasks:
return {"tasks": 0}
success_count = sum(1 for t in self.tasks if t.get("success_rate", 0) > 0.5)
total_steps = sum(t.get("steps", 0) for t in self.tasks)
total_duration = sum(t.get("duration_s", 0) for t in self.tasks)
avg_phi_deltas = [s.get("delta", 0) for s in self.steps if "delta" in s]
return {
"total_tasks": len(self.tasks),
"successful_tasks": success_count,
"success_rate": success_count / len(self.tasks) if self.tasks else 0,
"total_steps": total_steps,
"avg_steps_per_task": total_steps / len(self.tasks),
"total_duration_s": round(total_duration, 2),
"avg_duration_per_task_s": round(total_duration / len(self.tasks), 2),
"total_llm_calls": len(self.llm_calls),
"total_errors": len(self.errors),
"avg_phi_delta": round(sum(avg_phi_deltas) / len(avg_phi_deltas), 3) if avg_phi_deltas else 0,
}
# ---------------------------------------------------------------------------
# Callback Manager — dispatches events to multiple callbacks
# ---------------------------------------------------------------------------
class CallbackManager:
"""
Manages multiple callbacks and dispatches events to all of them.
Usage:
mgr = CallbackManager()
mgr.add(LoggingCallback())
mgr.add(MetricsCollector())
mgr.add(JSONFileCallback("events.jsonl"))
mgr.emit(AgentEvent(EventType.TASK_START, data={"purpose": "..."}))
"""
def __init__(self, callbacks: list | None = None):
self.callbacks: list = callbacks or []
def add(self, callback) -> "CallbackManager":
self.callbacks.append(callback)
return self
def emit(self, event: AgentEvent) -> None:
for cb in self.callbacks:
try:
cb.on_event(event)
except Exception as e:
logger.warning(f"Callback {type(cb).__name__} failed: {e}")