""" trace.py — Structured execution traces with JSONL persistence. Every Orchestrator step emits TraceEvents. A Trace is the full sequence for one task. Traces are the raw material for memory extraction, debugging, and evaluation. They are append-only and immutable once written. """ from __future__ import annotations import json import logging import time import uuid from dataclasses import dataclass, field, asdict from pathlib import Path from typing import Any, Iterator logger = logging.getLogger(__name__) @dataclass class TraceEvent: """A single event in an execution trace.""" kind: str # "action", "score", "tool_call", "tool_result", "error", "memory_read", "memory_write" step: int = 0 timestamp: float = field(default_factory=time.time) data: dict[str, Any] = field(default_factory=dict) trace_id: str = "" event_id: str = field(default_factory=lambda: uuid.uuid4().hex[:10]) def to_dict(self) -> dict[str, Any]: return { "kind": self.kind, "step": self.step, "timestamp": self.timestamp, "data": self.data, "trace_id": self.trace_id, "event_id": self.event_id, } @dataclass class Trace: """ A complete execution trace for one task run. Traces are immutable after finalization. They can be saved to JSONL and reloaded for offline analysis, memory extraction, or replay. """ trace_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) purpose: str = "" run_mode: str = "learning_train" started_at: float = field(default_factory=time.time) finished_at: float = 0.0 events: list[TraceEvent] = field(default_factory=list) metadata: dict[str, Any] = field(default_factory=dict) def emit(self, kind: str, step: int = 0, **data) -> TraceEvent: """Add a new event to the trace.""" event = TraceEvent( kind=kind, step=step, data=data, trace_id=self.trace_id, ) self.events.append(event) return event def finalize(self) -> None: """Mark the trace as complete.""" self.finished_at = time.time() @property def duration_s(self) -> float: if self.finished_at > 0: return self.finished_at - self.started_at return time.time() - self.started_at @property def step_count(self) -> int: return max((e.step for e in self.events), default=0) # --- JSONL persistence --- def save(self, path: str) -> None: """Save trace as JSONL (one event per line + header).""" Path(path).parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: # Header line header = { "_type": "trace_header", "trace_id": self.trace_id, "purpose": self.purpose, "run_mode": self.run_mode, "started_at": self.started_at, "finished_at": self.finished_at, "metadata": self.metadata, } f.write(json.dumps(header, default=str) + "\n") for event in self.events: f.write(json.dumps(event.to_dict(), default=str) + "\n") @classmethod def load(cls, path: str) -> "Trace": """Load a trace from JSONL file.""" with open(path) as f: lines = [json.loads(line) for line in f if line.strip()] if not lines: return cls() header = lines[0] trace = cls( trace_id=header.get("trace_id", ""), purpose=header.get("purpose", ""), run_mode=header.get("run_mode", "learning_train"), started_at=header.get("started_at", 0), finished_at=header.get("finished_at", 0), metadata=header.get("metadata", {}), ) for line in lines[1:]: trace.events.append(TraceEvent( kind=line.get("kind", ""), step=line.get("step", 0), timestamp=line.get("timestamp", 0), data=line.get("data", {}), trace_id=line.get("trace_id", ""), event_id=line.get("event_id", ""), )) return trace @classmethod def load_many(cls, directory: str, glob: str = "*.jsonl") -> list["Trace"]: """Load all traces from a directory.""" traces = [] for p in sorted(Path(directory).glob(glob)): try: traces.append(cls.load(str(p))) except Exception as e: logger.warning(f"Failed to load trace {p}: {e}") return traces