File size: 4,676 Bytes
7498b52 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | """
trace.py — Structured execution traces with JSONL persistence.
Every Orchestrator step emits TraceEvents. A Trace is the full sequence
for one task. Traces are the raw material for memory extraction, debugging,
and evaluation. They are append-only and immutable once written.
"""
from __future__ import annotations
import json
import logging
import time
import uuid
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any, Iterator
logger = logging.getLogger(__name__)
@dataclass
class TraceEvent:
"""A single event in an execution trace."""
kind: str # "action", "score", "tool_call", "tool_result", "error", "memory_read", "memory_write"
step: int = 0
timestamp: float = field(default_factory=time.time)
data: dict[str, Any] = field(default_factory=dict)
trace_id: str = ""
event_id: str = field(default_factory=lambda: uuid.uuid4().hex[:10])
def to_dict(self) -> dict[str, Any]:
return {
"kind": self.kind,
"step": self.step,
"timestamp": self.timestamp,
"data": self.data,
"trace_id": self.trace_id,
"event_id": self.event_id,
}
@dataclass
class Trace:
"""
A complete execution trace for one task run.
Traces are immutable after finalization. They can be saved to JSONL
and reloaded for offline analysis, memory extraction, or replay.
"""
trace_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
purpose: str = ""
run_mode: str = "learning_train"
started_at: float = field(default_factory=time.time)
finished_at: float = 0.0
events: list[TraceEvent] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
def emit(self, kind: str, step: int = 0, **data) -> TraceEvent:
"""Add a new event to the trace."""
event = TraceEvent(
kind=kind, step=step, data=data, trace_id=self.trace_id,
)
self.events.append(event)
return event
def finalize(self) -> None:
"""Mark the trace as complete."""
self.finished_at = time.time()
@property
def duration_s(self) -> float:
if self.finished_at > 0:
return self.finished_at - self.started_at
return time.time() - self.started_at
@property
def step_count(self) -> int:
return max((e.step for e in self.events), default=0)
# --- JSONL persistence ---
def save(self, path: str) -> None:
"""Save trace as JSONL (one event per line + header)."""
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
# Header line
header = {
"_type": "trace_header",
"trace_id": self.trace_id,
"purpose": self.purpose,
"run_mode": self.run_mode,
"started_at": self.started_at,
"finished_at": self.finished_at,
"metadata": self.metadata,
}
f.write(json.dumps(header, default=str) + "\n")
for event in self.events:
f.write(json.dumps(event.to_dict(), default=str) + "\n")
@classmethod
def load(cls, path: str) -> "Trace":
"""Load a trace from JSONL file."""
with open(path) as f:
lines = [json.loads(line) for line in f if line.strip()]
if not lines:
return cls()
header = lines[0]
trace = cls(
trace_id=header.get("trace_id", ""),
purpose=header.get("purpose", ""),
run_mode=header.get("run_mode", "learning_train"),
started_at=header.get("started_at", 0),
finished_at=header.get("finished_at", 0),
metadata=header.get("metadata", {}),
)
for line in lines[1:]:
trace.events.append(TraceEvent(
kind=line.get("kind", ""),
step=line.get("step", 0),
timestamp=line.get("timestamp", 0),
data=line.get("data", {}),
trace_id=line.get("trace_id", ""),
event_id=line.get("event_id", ""),
))
return trace
@classmethod
def load_many(cls, directory: str, glob: str = "*.jsonl") -> list["Trace"]:
"""Load all traces from a directory."""
traces = []
for p in sorted(Path(directory).glob(glob)):
try:
traces.append(cls.load(str(p)))
except Exception as e:
logger.warning(f"Failed to load trace {p}: {e}")
return traces
|