| """ |
| Observability — Callback hooks, cost tracking, and span tracing. |
| |
| Provides production-grade visibility into agent runs: |
| - Token & cost tracking per step, per task, per agent |
| - Callback hooks for custom integrations (LangSmith, Arize, custom dashboards) |
| - Structured event logging |
| - Performance profiling |
| |
| Lightweight — no external dependencies. Integrates with OpenTelemetry-compatible |
| systems via the callback interface. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import time |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Any, Callable, Protocol |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| |
| MODEL_COSTS_PER_1M_INPUT = { |
| |
| "gpt-4o": 2.50, |
| "gpt-4o-mini": 0.15, |
| "claude-3-5-sonnet": 3.00, |
| "claude-3-5-haiku": 0.80, |
| |
| "qwen/qwen3-32b": 0.20, |
| "qwen/qwen3-8b": 0.05, |
| "meta-llama/llama-3.1-8b-instruct": 0.05, |
| |
| "local-gpu": 0.01, |
| "local-cpu": 0.005, |
| "ollama": 0.005, |
| } |
|
|
|
|
| @dataclass |
| class TokenUsage: |
| """Token usage for a single LLM call.""" |
| prompt_tokens: int = 0 |
| completion_tokens: int = 0 |
| total_tokens: int = 0 |
| model: str = "" |
| estimated_cost_usd: float = 0.0 |
| timestamp: float = field(default_factory=time.time) |
|
|
|
|
| @dataclass |
| class CostTracker: |
| """ |
| Tracks token usage and estimated costs across all LLM calls. |
| |
| Usage: |
| tracker = CostTracker(model_name="qwen3:1.7b", cost_per_1m=0.005) |
| tracker.record(prompt_tokens=500, completion_tokens=200) |
| print(tracker.summary()) |
| """ |
| model_name: str = "unknown" |
| cost_per_1m_input: float = 0.01 |
| cost_per_1m_output: float = 0.02 |
| calls: list[TokenUsage] = field(default_factory=list) |
|
|
| def record( |
| self, |
| prompt_tokens: int = 0, |
| completion_tokens: int = 0, |
| model: str | None = None, |
| ) -> TokenUsage: |
| """Record a single LLM call.""" |
| total = prompt_tokens + completion_tokens |
| cost = ( |
| prompt_tokens * self.cost_per_1m_input / 1_000_000 |
| + completion_tokens * self.cost_per_1m_output / 1_000_000 |
| ) |
|
|
| usage = TokenUsage( |
| prompt_tokens=prompt_tokens, |
| completion_tokens=completion_tokens, |
| total_tokens=total, |
| model=model or self.model_name, |
| estimated_cost_usd=cost, |
| ) |
| self.calls.append(usage) |
| return usage |
|
|
| @property |
| def total_tokens(self) -> int: |
| return sum(c.total_tokens for c in self.calls) |
|
|
| @property |
| def total_cost_usd(self) -> float: |
| return sum(c.estimated_cost_usd for c in self.calls) |
|
|
| @property |
| def total_calls(self) -> int: |
| return len(self.calls) |
|
|
| def summary(self) -> dict[str, Any]: |
| return { |
| "model": self.model_name, |
| "total_calls": self.total_calls, |
| "total_tokens": self.total_tokens, |
| "prompt_tokens": sum(c.prompt_tokens for c in self.calls), |
| "completion_tokens": sum(c.completion_tokens for c in self.calls), |
| "estimated_cost_usd": round(self.total_cost_usd, 6), |
| } |
|
|
| def reset(self): |
| self.calls.clear() |
|
|
|
|
| |
| |
| |
|
|
| class EventType(Enum): |
| """Events emitted during agent execution.""" |
| TASK_START = "task_start" |
| TASK_END = "task_end" |
| STEP_START = "step_start" |
| STEP_END = "step_end" |
| ACTION_DECIDED = "action_decided" |
| TOOL_CALLED = "tool_called" |
| TOOL_RESULT = "tool_result" |
| STATE_EVALUATED = "state_evaluated" |
| LLM_CALL_START = "llm_call_start" |
| LLM_CALL_END = "llm_call_end" |
| HEURISTIC_LEARNED = "heuristic_learned" |
| MEMORY_UPDATED = "memory_updated" |
| OPTIMIZATION_START = "optimization_start" |
| OPTIMIZATION_END = "optimization_end" |
| ERROR = "error" |
| CHECKPOINT = "checkpoint" |
| HUMAN_INPUT_REQUESTED = "human_input_requested" |
| HUMAN_INPUT_RECEIVED = "human_input_received" |
|
|
|
|
| @dataclass |
| class AgentEvent: |
| """A structured event emitted during agent execution.""" |
| event_type: EventType |
| data: dict[str, Any] = field(default_factory=dict) |
| step: int = 0 |
| task_id: str = "" |
| agent_id: str = "" |
| timestamp: float = field(default_factory=time.time) |
| duration_s: float = 0.0 |
|
|
| def to_dict(self) -> dict[str, Any]: |
| return { |
| "event": self.event_type.value, |
| "data": self.data, |
| "step": self.step, |
| "task_id": self.task_id, |
| "agent_id": self.agent_id, |
| "timestamp": self.timestamp, |
| "duration_s": self.duration_s, |
| } |
|
|
| def to_json(self) -> str: |
| return json.dumps(self.to_dict(), default=str) |
|
|
|
|
| class AgentCallback(Protocol): |
| """Protocol for agent callbacks. Implement this to integrate with external systems.""" |
|
|
| def on_event(self, event: AgentEvent) -> None: |
| """Called when an event occurs during agent execution.""" |
| ... |
|
|
|
|
| class LoggingCallback: |
| """Simple callback that logs all events.""" |
|
|
| def __init__(self, level: int = logging.INFO): |
| self.level = level |
| self.events: list[AgentEvent] = [] |
|
|
| def on_event(self, event: AgentEvent) -> None: |
| self.events.append(event) |
| logger.log( |
| self.level, |
| f"[{event.event_type.value}] step={event.step} " |
| f"task={event.task_id} {json.dumps(event.data, default=str)[:200]}", |
| ) |
|
|
|
|
| class JSONFileCallback: |
| """Callback that writes events to a JSON Lines file.""" |
|
|
| def __init__(self, path: str): |
| self.path = path |
|
|
| def on_event(self, event: AgentEvent) -> None: |
| with open(self.path, "a") as f: |
| f.write(event.to_json() + "\n") |
|
|
|
|
| class MetricsCollector: |
| """ |
| Callback that collects aggregate metrics for analysis. |
| |
| Usage: |
| collector = MetricsCollector() |
| # ... run tasks with collector as callback ... |
| print(collector.summary()) |
| """ |
|
|
| def __init__(self): |
| self.tasks: list[dict] = [] |
| self.steps: list[dict] = [] |
| self.llm_calls: list[dict] = [] |
| self.errors: list[dict] = [] |
| self._current_task: dict = {} |
| self._step_start: float = 0 |
|
|
| def on_event(self, event: AgentEvent) -> None: |
| if event.event_type == EventType.TASK_START: |
| self._current_task = {"task_id": event.task_id, "start": event.timestamp, "steps": 0} |
| elif event.event_type == EventType.TASK_END: |
| self._current_task["end"] = event.timestamp |
| self._current_task["duration_s"] = event.timestamp - self._current_task.get("start", event.timestamp) |
| self._current_task.update(event.data) |
| self.tasks.append(self._current_task) |
| elif event.event_type == EventType.STEP_START: |
| self._step_start = event.timestamp |
| if self._current_task: |
| self._current_task["steps"] = self._current_task.get("steps", 0) + 1 |
| elif event.event_type == EventType.STATE_EVALUATED: |
| self.steps.append({ |
| "step": event.step, |
| "task_id": event.task_id, |
| "duration_s": event.timestamp - self._step_start if self._step_start else 0, |
| **event.data, |
| }) |
| elif event.event_type == EventType.LLM_CALL_END: |
| self.llm_calls.append(event.data) |
| elif event.event_type == EventType.ERROR: |
| self.errors.append({"step": event.step, **event.data}) |
|
|
| def summary(self) -> dict[str, Any]: |
| if not self.tasks: |
| return {"tasks": 0} |
|
|
| success_count = sum(1 for t in self.tasks if t.get("success_rate", 0) > 0.5) |
| total_steps = sum(t.get("steps", 0) for t in self.tasks) |
| total_duration = sum(t.get("duration_s", 0) for t in self.tasks) |
| avg_phi_deltas = [s.get("delta", 0) for s in self.steps if "delta" in s] |
|
|
| return { |
| "total_tasks": len(self.tasks), |
| "successful_tasks": success_count, |
| "success_rate": success_count / len(self.tasks) if self.tasks else 0, |
| "total_steps": total_steps, |
| "avg_steps_per_task": total_steps / len(self.tasks), |
| "total_duration_s": round(total_duration, 2), |
| "avg_duration_per_task_s": round(total_duration / len(self.tasks), 2), |
| "total_llm_calls": len(self.llm_calls), |
| "total_errors": len(self.errors), |
| "avg_phi_delta": round(sum(avg_phi_deltas) / len(avg_phi_deltas), 3) if avg_phi_deltas else 0, |
| } |
|
|
|
|
| |
| |
| |
|
|
| class CallbackManager: |
| """ |
| Manages multiple callbacks and dispatches events to all of them. |
| |
| Usage: |
| mgr = CallbackManager() |
| mgr.add(LoggingCallback()) |
| mgr.add(MetricsCollector()) |
| mgr.add(JSONFileCallback("events.jsonl")) |
| |
| mgr.emit(AgentEvent(EventType.TASK_START, data={"purpose": "..."})) |
| """ |
|
|
| def __init__(self, callbacks: list | None = None): |
| self.callbacks: list = callbacks or [] |
|
|
| def add(self, callback) -> "CallbackManager": |
| self.callbacks.append(callback) |
| return self |
|
|
| def emit(self, event: AgentEvent) -> None: |
| for cb in self.callbacks: |
| try: |
| cb.on_event(event) |
| except Exception as e: |
| logger.warning(f"Callback {type(cb).__name__} failed: {e}") |
|
|