Spaces:
Sleeping
Sleeping
| # server/trajectory.py | |
| """ | |
| Full trajectory recording and deterministic replay system. | |
| Records every action, observation, reward, file diff, and timing. | |
| Enables post-hoc analysis and deterministic replay of agent episodes. | |
| """ | |
| import time | |
| import copy | |
| import hashlib | |
| from typing import Optional, List, Dict, Any | |
| from dataclasses import dataclass, field, asdict | |
| class FileDiff: | |
| """Represents a file change made by the agent.""" | |
| path: str | |
| before: Optional[str] # None if file was created | |
| after: str | |
| chars_changed: int | |
| class TrajectoryStep: | |
| """Complete record of one agent step.""" | |
| step_number: int | |
| timestamp: float | |
| action_type: str | |
| action_path: Optional[str] | |
| action_query: Optional[str] | |
| action_content_length: Optional[int] # Don't store full content β too large | |
| observation_snapshot: Dict[str, Any] # Compact snapshot | |
| reward: float | |
| cumulative_reward: float | |
| done: bool | |
| error: Optional[str] | |
| file_diff: Optional[Dict[str, Any]] # If write_file, the diff | |
| test_pass_rate: Optional[float] # If run_tests, the pass rate | |
| duration_ms: float # How long this step took server-side | |
| security_flags: List[str] = field(default_factory=list) | |
| class TrajectoryRecord: | |
| """Complete episode trajectory β everything needed for replay + analysis.""" | |
| episode_id: str | |
| task: str | |
| variant_id: str | |
| start_time: float | |
| end_time: Optional[float] = None | |
| steps: List[TrajectoryStep] = field(default_factory=list) | |
| final_score: float = 0.0 | |
| total_steps: int = 0 | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| def to_dict(self) -> dict: | |
| """Convert to JSON-serializable dict.""" | |
| return { | |
| "episode_id": self.episode_id, | |
| "task": self.task, | |
| "variant_id": self.variant_id, | |
| "start_time": self.start_time, | |
| "end_time": self.end_time, | |
| "duration_seconds": round(self.end_time - self.start_time, 2) if self.end_time else None, | |
| "steps": [asdict(s) for s in self.steps], | |
| "final_score": self.final_score, | |
| "total_steps": self.total_steps, | |
| "metadata": self.metadata, | |
| } | |
| class TrajectoryLogger: | |
| """ | |
| Records full agent trajectories for analysis and replay. | |
| Usage: | |
| logger = TrajectoryLogger() | |
| logger.start_episode("task1", "variant_3") | |
| logger.record_step(step_number=1, action=..., obs=..., ...) | |
| logger.end_episode(final_score=0.75) | |
| trajectory = logger.get_trajectory() | |
| """ | |
| def __init__(self): | |
| self._current: Optional[TrajectoryRecord] = None | |
| self._history: List[TrajectoryRecord] = [] # Last N episodes | |
| self._max_history = 10 | |
| def start_episode(self, task: str, variant_id: str) -> str: | |
| """Start recording a new episode. Returns episode_id.""" | |
| # Finalize previous episode if still active | |
| if self._current and self._current.end_time is None: | |
| self._current.end_time = time.time() | |
| self._history.append(self._current) | |
| episode_id = hashlib.md5( | |
| f"{task}_{variant_id}_{time.time()}".encode() | |
| ).hexdigest()[:12] | |
| self._current = TrajectoryRecord( | |
| episode_id=episode_id, | |
| task=task, | |
| variant_id=variant_id, | |
| start_time=time.time(), | |
| ) | |
| return episode_id | |
| def record_step( | |
| self, | |
| step_number: int, | |
| action_type: str, | |
| action_path: Optional[str], | |
| action_query: Optional[str], | |
| action_content_length: Optional[int], | |
| reward: float, | |
| cumulative_reward: float, | |
| done: bool, | |
| error: Optional[str], | |
| file_diff: Optional[FileDiff], | |
| test_pass_rate: Optional[float], | |
| duration_ms: float, | |
| observation_compact: Dict[str, Any], | |
| security_flags: List[str] = None, | |
| ): | |
| """Record one step in the current trajectory.""" | |
| if not self._current: | |
| return | |
| step = TrajectoryStep( | |
| step_number=step_number, | |
| timestamp=time.time(), | |
| action_type=action_type, | |
| action_path=action_path, | |
| action_query=action_query, | |
| action_content_length=action_content_length, | |
| observation_snapshot=observation_compact, | |
| reward=reward, | |
| cumulative_reward=cumulative_reward, | |
| done=done, | |
| error=error, | |
| file_diff=asdict(file_diff) if file_diff else None, | |
| test_pass_rate=test_pass_rate, | |
| duration_ms=duration_ms, | |
| security_flags=security_flags or [], | |
| ) | |
| self._current.steps.append(step) | |
| self._current.total_steps = step_number | |
| def end_episode(self, final_score: float): | |
| """Finalize the current episode.""" | |
| if not self._current: | |
| return | |
| self._current.end_time = time.time() | |
| self._current.final_score = final_score | |
| # Maintain history buffer | |
| self._history.append(self._current) | |
| if len(self._history) > self._max_history: | |
| self._history.pop(0) | |
| def get_trajectory(self) -> Optional[dict]: | |
| """Get the current/latest trajectory as dict.""" | |
| if self._current: | |
| return self._current.to_dict() | |
| if self._history: | |
| return self._history[-1].to_dict() | |
| return None | |
| def get_replay_actions(self) -> List[dict]: | |
| """Extract action sequence for deterministic replay.""" | |
| if not self._current and not self._history: | |
| return [] | |
| record = self._current or self._history[-1] | |
| actions = [] | |
| for step in record.steps: | |
| action = {"action_type": step.action_type} | |
| if step.action_path: | |
| action["path"] = step.action_path | |
| if step.action_query: | |
| action["query"] = step.action_query | |
| # Note: content not stored in trajectory β replay requires re-supplying it | |
| actions.append(action) | |
| return actions | |
| def get_step_timeline(self) -> List[dict]: | |
| """Get compact timeline of actions and outcomes for visualization.""" | |
| if not self._current: | |
| return [] | |
| timeline = [] | |
| for step in self._current.steps: | |
| timeline.append({ | |
| "step": step.step_number, | |
| "action": step.action_type, | |
| "path": step.action_path, | |
| "reward": step.reward, | |
| "error": step.error, | |
| "duration_ms": step.duration_ms, | |
| "pass_rate": step.test_pass_rate, | |
| "security_flags": step.security_flags, | |
| }) | |
| return timeline | |
| def get_history_summary(self) -> List[dict]: | |
| """Get summary of recent episodes.""" | |
| summaries = [] | |
| for record in self._history: | |
| summaries.append({ | |
| "episode_id": record.episode_id, | |
| "task": record.task, | |
| "variant_id": record.variant_id, | |
| "final_score": record.final_score, | |
| "total_steps": record.total_steps, | |
| "duration_seconds": round( | |
| record.end_time - record.start_time, 2 | |
| ) if record.end_time else None, | |
| }) | |
| return summaries | |