| """ |
| trace.py — Structured execution traces with JSONL persistence. |
| |
| Every Orchestrator step emits TraceEvents. A Trace is the full sequence |
| for one task. Traces are the raw material for memory extraction, debugging, |
| and evaluation. They are append-only and immutable once written. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import time |
| import uuid |
| from dataclasses import dataclass, field, asdict |
| from pathlib import Path |
| from typing import Any, Iterator |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class TraceEvent: |
| """A single event in an execution trace.""" |
| kind: str |
| step: int = 0 |
| timestamp: float = field(default_factory=time.time) |
| data: dict[str, Any] = field(default_factory=dict) |
| trace_id: str = "" |
| event_id: str = field(default_factory=lambda: uuid.uuid4().hex[:10]) |
|
|
| def to_dict(self) -> dict[str, Any]: |
| return { |
| "kind": self.kind, |
| "step": self.step, |
| "timestamp": self.timestamp, |
| "data": self.data, |
| "trace_id": self.trace_id, |
| "event_id": self.event_id, |
| } |
|
|
|
|
| @dataclass |
| class Trace: |
| """ |
| A complete execution trace for one task run. |
| |
| Traces are immutable after finalization. They can be saved to JSONL |
| and reloaded for offline analysis, memory extraction, or replay. |
| """ |
| trace_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) |
| purpose: str = "" |
| run_mode: str = "learning_train" |
| started_at: float = field(default_factory=time.time) |
| finished_at: float = 0.0 |
| events: list[TraceEvent] = field(default_factory=list) |
| metadata: dict[str, Any] = field(default_factory=dict) |
|
|
| def emit(self, kind: str, step: int = 0, **data) -> TraceEvent: |
| """Add a new event to the trace.""" |
| event = TraceEvent( |
| kind=kind, step=step, data=data, trace_id=self.trace_id, |
| ) |
| self.events.append(event) |
| return event |
|
|
| def finalize(self) -> None: |
| """Mark the trace as complete.""" |
| self.finished_at = time.time() |
|
|
| @property |
| def duration_s(self) -> float: |
| if self.finished_at > 0: |
| return self.finished_at - self.started_at |
| return time.time() - self.started_at |
|
|
| @property |
| def step_count(self) -> int: |
| return max((e.step for e in self.events), default=0) |
|
|
| |
|
|
| def save(self, path: str) -> None: |
| """Save trace as JSONL (one event per line + header).""" |
| Path(path).parent.mkdir(parents=True, exist_ok=True) |
| with open(path, "w") as f: |
| |
| header = { |
| "_type": "trace_header", |
| "trace_id": self.trace_id, |
| "purpose": self.purpose, |
| "run_mode": self.run_mode, |
| "started_at": self.started_at, |
| "finished_at": self.finished_at, |
| "metadata": self.metadata, |
| } |
| f.write(json.dumps(header, default=str) + "\n") |
| for event in self.events: |
| f.write(json.dumps(event.to_dict(), default=str) + "\n") |
|
|
| @classmethod |
| def load(cls, path: str) -> "Trace": |
| """Load a trace from JSONL file.""" |
| with open(path) as f: |
| lines = [json.loads(line) for line in f if line.strip()] |
|
|
| if not lines: |
| return cls() |
|
|
| header = lines[0] |
| trace = cls( |
| trace_id=header.get("trace_id", ""), |
| purpose=header.get("purpose", ""), |
| run_mode=header.get("run_mode", "learning_train"), |
| started_at=header.get("started_at", 0), |
| finished_at=header.get("finished_at", 0), |
| metadata=header.get("metadata", {}), |
| ) |
|
|
| for line in lines[1:]: |
| trace.events.append(TraceEvent( |
| kind=line.get("kind", ""), |
| step=line.get("step", 0), |
| timestamp=line.get("timestamp", 0), |
| data=line.get("data", {}), |
| trace_id=line.get("trace_id", ""), |
| event_id=line.get("event_id", ""), |
| )) |
| return trace |
|
|
| @classmethod |
| def load_many(cls, directory: str, glob: str = "*.jsonl") -> list["Trace"]: |
| """Load all traces from a directory.""" |
| traces = [] |
| for p in sorted(Path(directory).glob(glob)): |
| try: |
| traces.append(cls.load(str(p))) |
| except Exception as e: |
| logger.warning(f"Failed to load trace {p}: {e}") |
| return traces |
|
|