darkfire514's picture
Upload 160 files
399b80c verified
"""ExecutionAnalyzer — post-execution analysis and skill quality tracking.
Responsibilities:
1. After each task execution, load recording artifacts.
2. Build an LLM prompt and obtain an ``ExecutionAnalysis``.
3. Persist the analysis and update ``SkillRecord`` counters via ``SkillStore``.
4. Surface evolution candidates for downstream processing.
Integration:
Instantiated once during ``OpenSpace.initialize()``.
``analyze_execution()`` is called in the ``finally`` block of ``OpenSpace.execute()``.
"""
from __future__ import annotations
import copy
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from openspace.grounding.core.tool import BaseTool
from .types import (
EvolutionSuggestion,
EvolutionType,
ExecutionAnalysis,
SkillCategory,
SkillJudgment,
)
from .store import SkillStore
from openspace.prompts import SkillEnginePrompts
from openspace.utils.logging import Logger
from .conversation_formatter import format_conversations
if TYPE_CHECKING:
from openspace.llm import LLMClient
from openspace.grounding.core.quality import ToolQualityManager
from .registry import SkillRegistry
logger = Logger.get_logger(__name__)
# Maximum characters of conversation log to include in the analysis prompt.
_MAX_CONVERSATION_CHARS = 80_000
# Per-section truncation limits
_TOOL_ERROR_MAX_CHARS = 1000 # Errors: keep key info, no full stack traces
_TOOL_SUCCESS_MAX_CHARS = 800 # Success results
_TOOL_ARGS_MAX_CHARS = 500 # Tool call arguments
_TOOL_SUMMARY_MAX_CHARS = 1500 # Embedded execution summaries from inner agents
# Skill & analysis-agent constants
_SKILL_CONTENT_MAX_CHARS = 8000 # Max chars per skill SKILL.md in prompt
_ANALYSIS_MAX_ITERATIONS = 5 # Max tool-calling rounds for analysis agent
def _correct_skill_ids(
ids: List[str], known_ids: set,
) -> List[str]:
"""Best-effort correction of LLM-hallucinated skill IDs.
LLMs frequently garble the hex suffix of generated IDs (e.g. swap
``cb`` → ``bc``). For each *id* not in *known_ids*, find the closest
known ID sharing the same name prefix (before ``__``) and within
edit-distance ≤ 3. If a unique match is found, silently replace it.
"""
if not known_ids:
return ids
corrected: List[str] = []
for raw_id in ids:
if raw_id in known_ids:
corrected.append(raw_id)
continue
# Extract name prefix (everything before the first "__")
prefix = raw_id.split("__")[0] if "__" in raw_id else ""
# Candidates: known IDs sharing the same name prefix
candidates = [
k for k in known_ids
if prefix and k.split("__")[0] == prefix
]
best, best_dist = None, 4 # threshold: edit distance ≤ 3
for cand in candidates:
d = _edit_distance(raw_id, cand)
if d < best_dist:
best, best_dist = cand, d
if best is not None:
logger.info(
f"Corrected LLM skill ID: {raw_id!r}{best!r} "
f"(edit_distance={best_dist})"
)
corrected.append(best)
else:
corrected.append(raw_id) # keep as-is; evolver will warn
return corrected
def _edit_distance(a: str, b: str) -> int:
"""Levenshtein edit distance (compact DP, O(min(m,n)) space)."""
if len(a) < len(b):
a, b = b, a
if not b:
return len(a)
prev = list(range(len(b) + 1))
for i, ca in enumerate(a, 1):
curr = [i] + [0] * len(b)
for j, cb in enumerate(b, 1):
curr[j] = min(
prev[j] + 1,
curr[j - 1] + 1,
prev[j - 1] + (0 if ca == cb else 1),
)
prev = curr
return prev[-1]
class ExecutionAnalyzer:
"""Analyzes task execution results and tracks skill quality.
Args:
store: Persistence layer for skill records and analyses.
llm_client: LLM client used for the analysis call.
model: Override model for analysis. If None, uses ``llm_client``'s default model.
enabled: Set False to skip analysis entirely.
"""
def __init__(
self,
store: SkillStore,
llm_client: "LLMClient",
model: Optional[str] = None,
enabled: bool = True,
skill_registry: Optional["SkillRegistry"] = None,
quality_manager: Optional["ToolQualityManager"] = None,
) -> None:
self._store = store
self._llm_client = llm_client
self._model = model
self.enabled = enabled
self._skill_registry = skill_registry
self._quality_manager = quality_manager
async def analyze_execution(
self,
task_id: str,
recording_dir: str,
execution_result: Dict[str, Any],
available_tools: Optional[List[BaseTool]] = None,
) -> Optional[ExecutionAnalysis]:
"""Run LLM analysis on a completed task and persist the result.
Args:
task_id: Unique identifier for the task.
recording_dir: Path to the recording directory containing metadata.json,
conversations.jsonl, etc.
execution_result: The return value of ``OpenSpace.execute()`` — contains status,
iterations, skills_used, etc.
available_tools: BaseTool instances from the execution (shell tools,
MCP tools, etc.). Passed through to the analysis agent loop so
it can reproduce errors or verify results when trace data is
ambiguous. A lightweight ``run_shell`` tool is always appended.
"""
if not self.enabled:
return None
rec_path = Path(recording_dir)
if not rec_path.is_dir():
logger.warning(
f"Recording directory not found, skipping analysis: {recording_dir}"
)
return None
# Check for duplicate — one analysis per task
existing = self._store.load_analyses_for_task(task_id)
if existing is not None:
logger.debug(f"Analysis already exists for task {task_id}, skipping")
return existing
try:
from gdpval_bench.token_tracker import set_call_source, reset_call_source
_src_tok = set_call_source("analyzer")
except ImportError:
_src_tok = None
try:
# 1. Load recording artifacts
context = self._load_recording_context(rec_path, execution_result)
if context is None:
return None
# 2. Build prompt
prompt = self._build_analysis_prompt(context)
# 3. Run analysis (agent loop with optional tool use)
raw_json = await self._run_analysis_loop(
prompt, available_tools=available_tools or [],
)
if raw_json is None:
return None
# 4. Parse into ExecutionAnalysis
analysis = self._parse_analysis(task_id, raw_json, context)
if analysis is None:
return None
# 5. Persist
await self._store.record_analysis(analysis)
evo_types = [s.evolution_type.value for s in analysis.evolution_suggestions]
logger.info(
f"Execution analysis saved for task {task_id}: "
f"completed={analysis.task_completed}, "
f"skills_judged={len(analysis.skill_judgments)}, "
f"evolution_suggestions={evo_types or 'none'}"
)
# 6. Feed tool issues to quality manager (if available).
# Build tool-status map from raw traj records for dedup.
traj_tool_status = self._build_tool_status_map(
context.get("traj_records", [])
)
await self._record_tool_quality_feedback(analysis, traj_tool_status)
return analysis
except Exception as e:
logger.error(f"Execution analysis failed for task {task_id}: {e}")
return None
finally:
if _src_tok is not None:
reset_call_source(_src_tok)
async def get_evolution_candidates(
self, limit: int = 20
) -> List[ExecutionAnalysis]:
"""Return recent analyses flagged as evolution candidates."""
return self._store.load_evolution_candidates(limit=limit)
@staticmethod
def _build_tool_status_map(
traj_records: List[Dict[str, Any]],
) -> Dict[str, bool]:
"""Build {tool_key: has_any_success} from raw traj records.
Used for deduplication: if all calls for a tool already failed
(rule-based caught them), there's no need for the LLM to add
another failure record.
"""
tool_has_success: Dict[str, bool] = {}
for entry in traj_records:
backend = entry.get("backend", "unknown")
tool = entry.get("tool", "unknown")
server = entry.get("server", "")
status = (entry.get("result") or {}).get("status", "unknown")
# Build canonical key matching the prompt format
key = f"{backend}:{server}:{tool}" if server else f"{backend}:{tool}"
if key not in tool_has_success:
tool_has_success[key] = False
if status != "error":
tool_has_success[key] = True
return tool_has_success
async def _record_tool_quality_feedback(
self,
analysis: ExecutionAnalysis,
traj_tool_status: Dict[str, bool],
) -> None:
"""Feed LLM-identified tool issues to the ToolQualityManager.
**Deduplication**: The rule-based system already records each tool
call as success/failure. The LLM adds value only when it catches
*semantic* failures the rule-based system missed.
``traj_tool_status`` maps ``tool_key → has_any_success_call``.
If all calls already failed → skip (rule-based caught it).
If any call was "success" but LLM says problematic → inject correction.
If tool not in traj → trust LLM (internal/system call).
"""
if not self._quality_manager or not analysis.tool_issues:
return
try:
filtered_issues: list[str] = []
for issue in analysis.tool_issues:
# Extract key from "key — description"
if "—" in issue:
key_part = issue.split("—", 1)[0].strip()
elif " - " in issue:
key_part = issue.split(" - ", 1)[0].strip()
else:
key_part = issue.strip()
if key_part in traj_tool_status and not traj_tool_status[key_part]:
logger.debug(
f"Skipping LLM issue for {key_part}: "
f"rule-based already recorded all calls as errors"
)
continue
filtered_issues.append(issue)
if not filtered_issues:
return
updated = await self._quality_manager.record_llm_tool_issues(
tool_issues=filtered_issues,
task_id=analysis.task_id,
)
if updated:
logger.debug(
f"Fed {updated} LLM tool issue(s) to ToolQualityManager "
f"(filtered from {len(analysis.tool_issues)} total) "
f"for task {analysis.task_id}"
)
except Exception as e:
# Quality feedback is best-effort; never break analysis flow
logger.debug(f"Tool quality feedback failed: {e}")
def _load_recording_context(
self,
rec_path: Path,
execution_result: Dict[str, Any],
) -> Optional[Dict[str, Any]]:
"""Load and structure all recording artifacts needed for analysis.
Returns a dict with keys used by ``_build_analysis_prompt()``,
or None if critical files are missing.
"""
# metadata.json (always present)
metadata_file = rec_path / "metadata.json"
if not metadata_file.exists():
logger.warning(f"metadata.json not found in {rec_path}")
return None
try:
metadata = json.loads(metadata_file.read_text(encoding="utf-8"))
except Exception as e:
logger.warning(f"Failed to read metadata.json: {e}")
return None
# conversations.jsonl (primary analysis source)
conv_file = rec_path / "conversations.jsonl"
conversations: List[Dict[str, Any]] = []
if conv_file.exists():
try:
for line in conv_file.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line:
conversations.append(json.loads(line))
except Exception as e:
logger.warning(f"Failed to read conversations.jsonl: {e}")
if not conversations:
logger.warning(f"No conversations found in {rec_path}, skipping analysis")
return None
# traj.jsonl (structured tool execution records)
traj_records = self._load_traj_data(rec_path)
# Extract key fields from metadata
task_description = metadata.get(
"task_description",
(metadata.get("skill_selection") or {}).get("task", ""),
)
if not task_description:
task_description = execution_result.get("instruction", "")
skill_selection = metadata.get("skill_selection", {})
selected_skills = skill_selection.get("selected", [])
retrieved_tools = metadata.get("retrieved_tools", {})
tool_defs = retrieved_tools.get("tools", [])
tool_names = [t.get("name", "") for t in tool_defs]
# Extract skill content from conversations setup message
# selected_skills contains skill_ids (stored in metadata by tool_layer)
skill_contents: Dict[str, str] = {}
for conv in conversations:
if conv.get("type") == "setup":
for msg in conv.get("messages", []):
content = msg.get("content", "")
if isinstance(content, str) and "# Active Skills" in content:
skill_contents = self._extract_skill_contents(
content, selected_skills
)
break
# Execution status — prefer runtime result, fall back to persisted metadata
status = execution_result.get("status", "")
iterations = execution_result.get("iterations", 0)
if not status:
outcome = metadata.get("execution_outcome", {})
status = outcome.get("status", "unknown")
iterations = iterations or outcome.get("iterations", 0)
# Derive actually-used tools from traj.jsonl
# traj_records tells us exactly which tools were invoked; retrieved_tools
# is the broader set that was *available* to the agent.
used_tool_keys: set = set()
for entry in traj_records:
backend = entry.get("backend", "")
tool = entry.get("tool", "")
server = entry.get("server", "")
if tool:
used_tool_keys.add(f"{backend}:{tool}")
if server:
used_tool_keys.add(f"{backend}:{server}:{tool}")
return {
"task_id": metadata.get("task_id", ""),
"task_description": task_description,
"selected_skills": selected_skills,
"skill_selection": skill_selection,
"skill_contents": skill_contents,
"tool_names": tool_names,
"tool_defs": tool_defs,
"used_tool_keys": used_tool_keys,
"conversations": conversations,
"traj_records": traj_records,
"execution_status": status,
"iterations": iterations,
"recording_dir": str(rec_path),
}
@staticmethod
def _load_traj_data(rec_path: Path) -> List[Dict[str, Any]]:
"""Load traj.jsonl and return structured tool execution records.
Each record contains: step, timestamp, backend, tool, command,
result (status, output/stderr), parameters, extra.
"""
traj_file = rec_path / "traj.jsonl"
records: List[Dict[str, Any]] = []
if not traj_file.exists():
return records
try:
for line in traj_file.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line:
records.append(json.loads(line))
except Exception as e:
logger.warning(f"Failed to read traj.jsonl: {e}")
return records
@staticmethod
def _extract_skill_contents(
injection_text: str,
selected_skill_ids: List[str],
) -> Dict[str, str]:
"""Parse the injected skill context to extract per-skill content.
The injection text uses ``### Skill: {skill_id}`` headers, so
we split by that pattern and match against the provided skill_ids.
"""
contents: Dict[str, str] = {}
id_set = set(selected_skill_ids)
parts = re.split(r"###\s+Skill:\s+", injection_text)
for part in parts[1:]: # skip preamble
lines = part.split("\n", 1)
sid = lines[0].strip()
body = lines[1] if len(lines) > 1 else ""
if sid in id_set:
contents[sid] = body[:5000]
return contents
def _load_skill_contents_from_disk(
self, skill_ids: List[str],
) -> Dict[str, Dict[str, str]]:
"""Load skill SKILL.md from disk via SkillRegistry.
Returns dict mapping ``skill_id`` → ``{"content", "dir", "description", "name"}``.
Falls back gracefully if registry is unavailable.
"""
result: Dict[str, Dict[str, str]] = {}
if not self._skill_registry or not skill_ids:
return result
for sid in skill_ids:
meta = self._skill_registry.get_skill(sid)
if not meta:
continue
content = self._skill_registry.load_skill_content(sid)
if not content:
continue
skill_dir = str(meta.path.parent)
if len(content) > _SKILL_CONTENT_MAX_CHARS:
content = (
content[:_SKILL_CONTENT_MAX_CHARS]
+ f"\n\n... [truncated at {_SKILL_CONTENT_MAX_CHARS} chars — "
f"use read_file(\"{meta.path}\") to see full content]"
)
result[sid] = {
"content": content,
"dir": skill_dir,
"description": meta.description,
"name": meta.name,
}
return result
def _build_analysis_prompt(self, context: Dict[str, Any]) -> str:
"""Build the LLM prompt for execution analysis.
``context["selected_skills"]`` contains true ``skill_id`` values.
"""
# Format conversation log (priority-based truncation)
conv_text = self._format_conversations(context["conversations"])
# Format traj.jsonl tool execution summary
traj_section = self._format_traj_summary(context["traj_records"])
# Skill section — keyed by skill_id throughout
selected_skill_ids: List[str] = context["selected_skills"]
skill_data = self._load_skill_contents_from_disk(selected_skill_ids)
if not skill_data and selected_skill_ids:
# Fallback: use content extracted from conversation injection text
for sid in selected_skill_ids:
content = context["skill_contents"].get(sid)
if content:
skill_data[sid] = {
"content": content,
"dir": "(unknown)",
"description": "",
"name": sid,
}
skill_section = ""
if skill_data:
parts = []
for sid, info in skill_data.items():
desc_line = (
f"\n**Description**: {info['description']}"
if info.get("description") else ""
)
display_name = info.get("name", sid)
parts.append(
f"### {sid}\n"
f"**Name**: {display_name}\n"
f"**Directory**: `{info['dir']}`{desc_line}\n\n"
f"{info['content']}"
)
skill_section = "## Selected Skills\n\n" + "\n\n---\n\n".join(parts)
# If no skills selected → skill_section stays "" (omitted from prompt)
# Tool list
tool_list = self._format_tool_list(
context.get("tool_defs", []),
context.get("used_tool_keys", set()),
)
# Resource info (recording dir + skill dirs)
rec_dir = context.get("recording_dir", "")
resource_lines: List[str] = []
if rec_dir:
resource_lines.append(f"**Recording directory**: `{rec_dir}`")
rec_path = Path(rec_dir)
if rec_path.is_dir():
files = [f.name for f in sorted(rec_path.iterdir()) if f.is_file()]
if files:
resource_lines.append(f" Files: {', '.join(files)}")
skill_dirs = {
sid: info["dir"]
for sid, info in skill_data.items()
if info.get("dir") and info["dir"] != "(unknown)"
}
if skill_dirs:
resource_lines.append("**Skill directories**:")
for sid, d in skill_dirs.items():
resource_lines.append(f" - {sid}: `{d}`")
resource_lines.append(
"\nYou have `read_file`, `list_dir`, and `run_shell` tools for deeper "
"investigation.\n**In most cases the trace above is sufficient** — only "
"use tools when evidence is ambiguous or you need to verify specific details."
)
resource_info = "\n".join(resource_lines)
return SkillEnginePrompts.execution_analysis(
task_description=context["task_description"],
execution_status=context["execution_status"],
iterations=context["iterations"],
tool_list=tool_list,
skill_section=skill_section,
conversation_log=conv_text,
traj_summary=traj_section,
selected_skill_ids_json=json.dumps(selected_skill_ids),
resource_info=resource_info,
)
@staticmethod
def _format_tool_list(
tool_defs: List[Dict[str, Any]],
used_tool_keys: set = None,
) -> str:
"""Format tool definitions with usage annotation.
Tools that appear in ``used_tool_keys`` (derived from traj.jsonl)
are marked as "Actually used". This lets the analysis LLM focus
on what actually happened without being distracted by unused tools.
Args:
tool_defs: Tool definitions from ``metadata.retrieved_tools.tools``.
Backend should be correctly recorded (mcp, shell, etc.) now
that the recording layer prefers ``runtime_info.backend``.
used_tool_keys: Set of ``"backend:tool_name"`` or ``"backend:server:tool_name"``
strings derived from traj.jsonl.
"""
if not tool_defs:
return "none"
if used_tool_keys is None:
used_tool_keys = set()
used_parts = []
available_parts = []
for t in tool_defs:
name = t.get("name", "?")
backend = t.get("backend", "?")
server = t.get("server_name")
label = f"{name} ({backend}/{server})" if server else f"{name} ({backend})"
# Match by backend:tool or backend:server:tool
key = f"{backend}:{name}"
key_with_server = f"{backend}:{server}:{name}" if server else ""
if key in used_tool_keys or key_with_server in used_tool_keys:
used_parts.append(label)
else:
available_parts.append(label)
sections = []
if used_parts:
sections.append(f"Actually used: {', '.join(used_parts)}")
if available_parts:
sections.append(f"Available but unused: {', '.join(available_parts)}")
return "\n".join(sections) if sections else "none"
@staticmethod
def _format_traj_summary(traj_records: List[Dict[str, Any]]) -> str:
"""Format traj.jsonl records into a concise tool execution timeline.
This provides the LLM with a structured view of every tool invocation
and its outcome, complementing the conversation log which shows the
agent's reasoning.
"""
if not traj_records:
return "(no traj.jsonl data available)"
lines = [f"Total tool invocations: {len(traj_records)}"]
error_count = sum(
1 for r in traj_records
if r.get("result", {}).get("status") == "error"
)
if error_count:
lines.append(f"Errors: {error_count}/{len(traj_records)}")
lines.append("") # blank line before timeline
for entry in traj_records:
step = entry.get("step", "?")
backend = entry.get("backend", "?")
tool = entry.get("tool", "?")
server = entry.get("server", "")
result = entry.get("result", {})
status = result.get("status", "?")
# Build compact one-line summary
command = entry.get("command", "")
if isinstance(command, str) and len(command) > 150:
command = command[:150] + "..."
# Include server for MCP tools so key is unambiguous
if server:
tool_label = f"{backend}:{server}:{tool}"
else:
tool_label = f"{backend}:{tool}"
line = f" Step {step} [{tool_label}] → {status}"
# Add error details for failed steps
if status == "error":
stderr = result.get("stderr", result.get("output", ""))
if isinstance(stderr, str) and stderr:
# Extract first meaningful line of error
error_first_line = stderr.strip().split("\n")[0][:200]
line += f" | {error_first_line}"
# Add brief command context
if command and not command.startswith("```"):
line += f" | cmd: {command[:100]}"
lines.append(line)
return "\n".join(lines)
@staticmethod
def _format_conversations(conversations: List[Dict[str, Any]]) -> str:
"""Format conversations.jsonl into a readable text block for the LLM.
Delegates to :func:`conversation_formatter.format_conversations`.
"""
return format_conversations(conversations, _MAX_CONVERSATION_CHARS)
async def _run_analysis_loop(
self,
prompt: str,
available_tools: Optional[List[BaseTool]] = None,
) -> Optional[Dict[str, Any]]:
"""Run analysis as an agent loop with optional tool use.
Most analyses complete in a single pass (LLM outputs JSON directly).
When the trace is ambiguous, the LLM may call the execution's own
tools (``read_file``, ``list_dir``, ``run_shell``, ``shell_agent``,
MCP tools, etc.) for deeper investigation or error reproduction.
Reuses ``LLMClient.complete()`` for retry, rate-limiting, tool
serialization, and tool execution.
Conversations are recorded to ``conversations.jsonl`` via
``RecordingManager`` (agent_name="ExecutionAnalyzer") so the full
analysis dialogue is preserved alongside the grounding trace.
"""
from openspace.recording import RecordingManager
model = self._model or self._llm_client.model
analysis_tools: List[BaseTool] = list(available_tools or [])
messages: List[Dict[str, Any]] = [
{"role": "user", "content": prompt},
]
# Record initial conversation setup
await RecordingManager.record_conversation_setup(
setup_messages=copy.deepcopy(messages),
tools=analysis_tools if analysis_tools else None,
agent_name="ExecutionAnalyzer",
)
for iteration in range(_ANALYSIS_MAX_ITERATIONS):
is_last = iteration == _ANALYSIS_MAX_ITERATIONS - 1
# Snapshot message count before any additions + LLM call
msg_count_before = len(messages)
# On the final iteration, force JSON output (no tools).
if is_last:
messages.append({
"role": "system",
"content": (
"This is your FINAL round — no more tool calls allowed. "
"You MUST output the JSON analysis object now based on "
"all information gathered so far."
),
})
try:
result = await self._llm_client.complete(
messages=messages,
tools=analysis_tools if not is_last else None,
execute_tools=True,
model=model,
)
except Exception as e:
logger.error(f"Analysis LLM call failed (iter {iteration}): {e}")
return None
content = result["message"].get("content", "")
has_tool_calls = result["has_tool_calls"]
# Record iteration delta
updated_messages = result["messages"]
delta = updated_messages[msg_count_before:]
await RecordingManager.record_iteration_context(
iteration=iteration + 1,
delta_messages=copy.deepcopy(delta),
response_metadata={
"has_tool_calls": has_tool_calls,
"tool_calls_count": len(result.get("tool_results", [])),
"is_final": not has_tool_calls,
},
agent_name="ExecutionAnalyzer",
)
if not has_tool_calls:
# No tool calls → final response, parse JSON
return self._extract_json(content)
# Tools were called and executed by complete() — continue with
# the updated messages (includes assistant + tool result messages).
messages = updated_messages
logger.debug(
f"Analysis agent used tools "
f"(iter {iteration + 1}/{_ANALYSIS_MAX_ITERATIONS})"
)
# Should not reach here (last iteration disables tools), but just in case
logger.warning(
f"Analysis agent reached max iterations ({_ANALYSIS_MAX_ITERATIONS})"
)
for m in reversed(messages):
if m.get("role") == "assistant" and m.get("content"):
return self._extract_json(m["content"])
return None
@staticmethod
def _extract_json(text: str) -> Optional[Dict[str, Any]]:
"""Extract a JSON object from LLM response text.
Handles markdown code fences and bare JSON.
"""
# Try code block first
code_match = re.search(
r"```(?:json)?\s*\n?(.*?)\n?```", text, re.DOTALL
)
if code_match:
text = code_match.group(1).strip()
else:
# Try bare JSON object
json_match = re.search(r"\{.*\}", text, re.DOTALL)
if json_match:
text = json_match.group()
try:
data = json.loads(text)
if isinstance(data, dict):
return data
logger.warning(f"LLM returned non-dict JSON: {type(data)}")
return None
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse LLM analysis JSON: {e}")
logger.debug(f"Raw LLM output (first 500 chars): {text[:500]}")
return None
@staticmethod
def _parse_analysis(
task_id: str,
data: Dict[str, Any],
context: Dict[str, Any],
) -> Optional[ExecutionAnalysis]:
"""Convert the raw LLM JSON output into an ExecutionAnalysis.
Also attaches observed tool execution records from ``traj.jsonl``
so the analysis contains both LLM judgments and factual data.
"""
try:
now = datetime.now()
# Collect all known skill IDs from context for fuzzy correction.
# LLMs often garble hex suffixes when reproducing skill IDs.
known_skill_ids: set = set()
for sid in context.get("selected_skills", []):
known_skill_ids.add(sid)
# Also include skill IDs from the skill_selection metadata
skill_sel = context.get("skill_selection") or {}
for sid in skill_sel.get("available_skills", []):
known_skill_ids.add(sid)
# Parse skill judgments (LLM-generated)
judgments: List[SkillJudgment] = []
for jd in data.get("skill_judgments", []):
raw_sid = jd.get("skill_id", "")
corrected = _correct_skill_ids([raw_sid], known_skill_ids)
judgments.append(
SkillJudgment(
skill_id=corrected[0] if corrected else raw_sid,
skill_applied=bool(jd.get("skill_applied", False)),
note=jd.get("note", ""),
)
)
# Parse evolution_suggestions (new format: list of typed suggestions)
suggestions: List[EvolutionSuggestion] = []
for raw_sug in data.get("evolution_suggestions", []):
try:
evo_type = EvolutionType(raw_sug.get("type", ""))
except ValueError:
logger.debug(f"Unknown evolution type: {raw_sug.get('type')}")
continue
cat = None
if raw_sug.get("category"):
try:
cat = SkillCategory(raw_sug["category"])
except ValueError:
logger.debug(f"Unknown category: {raw_sug.get('category')}")
# Support both "target_skills" (list) and legacy "target_skill" (str)
raw_targets = raw_sug.get("target_skills")
if isinstance(raw_targets, list):
targets = [t for t in raw_targets if t]
else:
legacy = raw_sug.get("target_skill", "")
targets = [legacy] if legacy else []
# Correct LLM-hallucinated skill IDs against known IDs.
# LLMs frequently swap/drop characters in hex suffixes
# (e.g. "61f694bc" instead of "61f694cb").
targets = _correct_skill_ids(targets, known_skill_ids)
suggestions.append(EvolutionSuggestion(
evolution_type=evo_type,
target_skill_ids=targets,
category=cat,
direction=raw_sug.get("direction", ""),
))
analysis = ExecutionAnalysis(
task_id=task_id,
timestamp=now,
task_completed=bool(data.get("task_completed", False)),
execution_note=data.get("execution_note", ""),
tool_issues=data.get("tool_issues", []),
skill_judgments=judgments,
evolution_suggestions=suggestions,
analyzed_by=data.get("analyzed_by", ""),
analyzed_at=now,
)
return analysis
except Exception as e:
logger.error(f"Failed to parse analysis response: {e}")
return None
# Convenience queries (delegated to store)
def get_store(self) -> SkillStore:
"""Access the underlying SkillStore for direct queries."""
return self._store
def close(self) -> None:
"""Close the store connection."""
self._store.close()