Rohan03's picture
V2 merge: purpose_agent/trace.py
7498b52 verified
"""
trace.py — Structured execution traces with JSONL persistence.
Every Orchestrator step emits TraceEvents. A Trace is the full sequence
for one task. Traces are the raw material for memory extraction, debugging,
and evaluation. They are append-only and immutable once written.
"""
from __future__ import annotations
import json
import logging
import time
import uuid
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any, Iterator
logger = logging.getLogger(__name__)
@dataclass
class TraceEvent:
"""A single event in an execution trace."""
kind: str # "action", "score", "tool_call", "tool_result", "error", "memory_read", "memory_write"
step: int = 0
timestamp: float = field(default_factory=time.time)
data: dict[str, Any] = field(default_factory=dict)
trace_id: str = ""
event_id: str = field(default_factory=lambda: uuid.uuid4().hex[:10])
def to_dict(self) -> dict[str, Any]:
return {
"kind": self.kind,
"step": self.step,
"timestamp": self.timestamp,
"data": self.data,
"trace_id": self.trace_id,
"event_id": self.event_id,
}
@dataclass
class Trace:
"""
A complete execution trace for one task run.
Traces are immutable after finalization. They can be saved to JSONL
and reloaded for offline analysis, memory extraction, or replay.
"""
trace_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
purpose: str = ""
run_mode: str = "learning_train"
started_at: float = field(default_factory=time.time)
finished_at: float = 0.0
events: list[TraceEvent] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
def emit(self, kind: str, step: int = 0, **data) -> TraceEvent:
"""Add a new event to the trace."""
event = TraceEvent(
kind=kind, step=step, data=data, trace_id=self.trace_id,
)
self.events.append(event)
return event
def finalize(self) -> None:
"""Mark the trace as complete."""
self.finished_at = time.time()
@property
def duration_s(self) -> float:
if self.finished_at > 0:
return self.finished_at - self.started_at
return time.time() - self.started_at
@property
def step_count(self) -> int:
return max((e.step for e in self.events), default=0)
# --- JSONL persistence ---
def save(self, path: str) -> None:
"""Save trace as JSONL (one event per line + header)."""
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
# Header line
header = {
"_type": "trace_header",
"trace_id": self.trace_id,
"purpose": self.purpose,
"run_mode": self.run_mode,
"started_at": self.started_at,
"finished_at": self.finished_at,
"metadata": self.metadata,
}
f.write(json.dumps(header, default=str) + "\n")
for event in self.events:
f.write(json.dumps(event.to_dict(), default=str) + "\n")
@classmethod
def load(cls, path: str) -> "Trace":
"""Load a trace from JSONL file."""
with open(path) as f:
lines = [json.loads(line) for line in f if line.strip()]
if not lines:
return cls()
header = lines[0]
trace = cls(
trace_id=header.get("trace_id", ""),
purpose=header.get("purpose", ""),
run_mode=header.get("run_mode", "learning_train"),
started_at=header.get("started_at", 0),
finished_at=header.get("finished_at", 0),
metadata=header.get("metadata", {}),
)
for line in lines[1:]:
trace.events.append(TraceEvent(
kind=line.get("kind", ""),
step=line.get("step", 0),
timestamp=line.get("timestamp", 0),
data=line.get("data", {}),
trace_id=line.get("trace_id", ""),
event_id=line.get("event_id", ""),
))
return trace
@classmethod
def load_many(cls, directory: str, glob: str = "*.jsonl") -> list["Trace"]:
"""Load all traces from a directory."""
traces = []
for p in sorted(Path(directory).glob(glob)):
try:
traces.append(cls.load(str(p)))
except Exception as e:
logger.warning(f"Failed to load trace {p}: {e}")
return traces