File size: 4,676 Bytes
7498b52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
trace.py — Structured execution traces with JSONL persistence.

Every Orchestrator step emits TraceEvents. A Trace is the full sequence
for one task. Traces are the raw material for memory extraction, debugging,
and evaluation. They are append-only and immutable once written.
"""
from __future__ import annotations

import json
import logging
import time
import uuid
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any, Iterator

logger = logging.getLogger(__name__)


@dataclass
class TraceEvent:
    """A single event in an execution trace."""
    kind: str                         # "action", "score", "tool_call", "tool_result", "error", "memory_read", "memory_write"
    step: int = 0
    timestamp: float = field(default_factory=time.time)
    data: dict[str, Any] = field(default_factory=dict)
    trace_id: str = ""
    event_id: str = field(default_factory=lambda: uuid.uuid4().hex[:10])

    def to_dict(self) -> dict[str, Any]:
        return {
            "kind": self.kind,
            "step": self.step,
            "timestamp": self.timestamp,
            "data": self.data,
            "trace_id": self.trace_id,
            "event_id": self.event_id,
        }


@dataclass
class Trace:
    """
    A complete execution trace for one task run.

    Traces are immutable after finalization. They can be saved to JSONL
    and reloaded for offline analysis, memory extraction, or replay.
    """
    trace_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
    purpose: str = ""
    run_mode: str = "learning_train"
    started_at: float = field(default_factory=time.time)
    finished_at: float = 0.0
    events: list[TraceEvent] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)

    def emit(self, kind: str, step: int = 0, **data) -> TraceEvent:
        """Add a new event to the trace."""
        event = TraceEvent(
            kind=kind, step=step, data=data, trace_id=self.trace_id,
        )
        self.events.append(event)
        return event

    def finalize(self) -> None:
        """Mark the trace as complete."""
        self.finished_at = time.time()

    @property
    def duration_s(self) -> float:
        if self.finished_at > 0:
            return self.finished_at - self.started_at
        return time.time() - self.started_at

    @property
    def step_count(self) -> int:
        return max((e.step for e in self.events), default=0)

    # --- JSONL persistence ---

    def save(self, path: str) -> None:
        """Save trace as JSONL (one event per line + header)."""
        Path(path).parent.mkdir(parents=True, exist_ok=True)
        with open(path, "w") as f:
            # Header line
            header = {
                "_type": "trace_header",
                "trace_id": self.trace_id,
                "purpose": self.purpose,
                "run_mode": self.run_mode,
                "started_at": self.started_at,
                "finished_at": self.finished_at,
                "metadata": self.metadata,
            }
            f.write(json.dumps(header, default=str) + "\n")
            for event in self.events:
                f.write(json.dumps(event.to_dict(), default=str) + "\n")

    @classmethod
    def load(cls, path: str) -> "Trace":
        """Load a trace from JSONL file."""
        with open(path) as f:
            lines = [json.loads(line) for line in f if line.strip()]

        if not lines:
            return cls()

        header = lines[0]
        trace = cls(
            trace_id=header.get("trace_id", ""),
            purpose=header.get("purpose", ""),
            run_mode=header.get("run_mode", "learning_train"),
            started_at=header.get("started_at", 0),
            finished_at=header.get("finished_at", 0),
            metadata=header.get("metadata", {}),
        )

        for line in lines[1:]:
            trace.events.append(TraceEvent(
                kind=line.get("kind", ""),
                step=line.get("step", 0),
                timestamp=line.get("timestamp", 0),
                data=line.get("data", {}),
                trace_id=line.get("trace_id", ""),
                event_id=line.get("event_id", ""),
            ))
        return trace

    @classmethod
    def load_many(cls, directory: str, glob: str = "*.jsonl") -> list["Trace"]:
        """Load all traces from a directory."""
        traces = []
        for p in sorted(Path(directory).glob(glob)):
            try:
                traces.append(cls.load(str(p)))
            except Exception as e:
                logger.warning(f"Failed to load trace {p}: {e}")
        return traces