purpose-agent / purpose_agent /robust_parser.py
Rohan03's picture
fix: universal parsing + OpenRouter + state bug — purpose_agent/robust_parser.py
d6bcdbc verified
raw
history blame
9.68 kB
"""
robust_parser.py — Universal LLM output parser that never requires JSON.
The problem: LLMs are unreliable at producing valid JSON. Different models
format differently. Structured output (json_schema) isn't supported everywhere.
The solution: Parse whatever the LLM gives you. Extract fields by multiple
strategies, fall back gracefully, and always return something usable.
This replaces the fragile generate_structured → json.loads → crash pattern.
"""
from __future__ import annotations
import json
import re
import logging
from typing import Any
logger = logging.getLogger(__name__)
def extract_json(text: str) -> dict[str, Any] | None:
"""
Try to extract a JSON object from arbitrary LLM text.
Handles: pure JSON, JSON in code blocks, JSON embedded in prose.
Returns None if no valid JSON found.
"""
text = text.strip()
# Strategy 1: Entire text is JSON
try:
return json.loads(text)
except (json.JSONDecodeError, ValueError):
pass
# Strategy 2: JSON in markdown code block
m = re.search(r'```(?:json)?\s*(\{.*\})\s*```', text, re.DOTALL)
if m:
try:
return json.loads(m.group(1))
except (json.JSONDecodeError, ValueError):
pass
# Strategy 3: Find outermost { ... } by brace matching
start = text.find('{')
if start >= 0:
depth = 0
for i in range(start, len(text)):
if text[i] == '{':
depth += 1
elif text[i] == '}':
depth -= 1
if depth == 0:
try:
return json.loads(text[start:i + 1])
except (json.JSONDecodeError, ValueError):
break
return None
def extract_field(text: str, field_name: str, default: str = "") -> str:
"""
Extract a named field value from LLM text, regardless of format.
Handles:
- JSON: {"field": "value"}
- Markdown: **field:** value / field: value
- Labeled: FIELD: value
- Line-based: field\nvalue
"""
text_lower = text.lower()
name_lower = field_name.lower()
# Try JSON first
obj = extract_json(text)
if obj and field_name in obj:
return str(obj[field_name])
# Pattern: "field_name": "value" or field_name: value
patterns = [
rf'"{field_name}"\s*:\s*"((?:[^"\\]|\\.)*)"', # JSON string
rf'"{field_name}"\s*:\s*(\d+\.?\d*)', # JSON number
rf'\*?\*?{field_name}\*?\*?\s*:\s*(.+?)(?:\n|$)', # Markdown/label
rf'{field_name}\s*[=:]\s*(.+?)(?:\n|$)', # Assignment
]
for pattern in patterns:
m = re.search(pattern, text, re.IGNORECASE)
if m:
return m.group(1).strip().strip('"').strip("'")
return default
def extract_number(text: str, field_name: str, default: float = 0.0) -> float:
"""Extract a numeric field from LLM text."""
val = extract_field(text, field_name)
if val:
try:
return float(val.rstrip('.').rstrip(','))
except (ValueError, TypeError):
pass
# Try direct pattern: field_name = X.X or field_name: X.X
m = re.search(rf'{field_name}\s*[=:]\s*([\d.]+)', text, re.IGNORECASE)
if m:
try:
return float(m.group(1).rstrip('.'))
except ValueError:
pass
return default
def extract_code(text: str) -> str:
"""
Extract Python code from LLM text.
Handles:
- Code in ``` blocks
- Code in "code" JSON field
- Raw code with def/class keywords
"""
# Strategy 1: JSON with code field
obj = extract_json(text)
if obj:
# Nested: action.params.code
action = obj.get("action", {})
if isinstance(action, dict):
params = action.get("params", {})
if isinstance(params, dict) and "code" in params:
return params["code"]
if "code" in obj:
return obj["code"]
# Strategy 2: Python code block
m = re.search(r'```(?:python)?\s*\n(.*?)```', text, re.DOTALL)
if m:
return m.group(1).strip()
# Strategy 3: Find code starting with def/class
lines = text.split('\n')
code_lines = []
in_code = False
for line in lines:
if re.match(r'^(def |class |import |from )', line.strip()):
in_code = True
if in_code:
# Stop at empty line after code, or at non-code text
if line.strip() == '' and code_lines and not code_lines[-1].strip().endswith(':'):
# Could be blank line in code — keep going if next line is indented
code_lines.append(line)
elif in_code and (line.startswith(' ') or line.startswith('\t') or
re.match(r'^(def |class |import |from |#|$)', line.strip())):
code_lines.append(line)
elif re.match(r'^(def |class )', line.strip()):
code_lines.append(line)
else:
if code_lines:
break
if code_lines:
return '\n'.join(code_lines).strip()
return ""
def parse_actor_response(text: str) -> dict[str, Any]:
"""
Parse an actor's response into thought/action/expected_delta.
Works with any format the LLM produces.
"""
# Try JSON first (best case)
obj = extract_json(text)
if obj and ("action" in obj or "thought" in obj):
return obj
# Extract fields individually
thought = extract_field(text, "thought")
expected_delta = extract_field(text, "expected_delta")
# Extract action name
action_name = extract_field(text, "name", "")
if not action_name:
action_name = extract_field(text, "action", "")
if action_name and action_name.startswith("{"):
action_name = "" # It's a JSON object, not a name
# Extract code if this is a coding task
code = extract_code(text)
# Build action
action = {"name": action_name or "UNKNOWN", "params": {}}
if code:
action["name"] = action.get("name", "submit_code") if action["name"] == "UNKNOWN" else action["name"]
action["params"]["code"] = code
if not thought:
# Use the first sentence as thought
thought = text.split('\n')[0][:200] if text else ""
return {
"thought": thought,
"action": action,
"expected_delta": expected_delta or "",
}
def parse_critic_response(text: str) -> dict[str, Any]:
"""
Parse a critic's response into phi_before/phi_after/reasoning/evidence/confidence.
Works with any format.
"""
# Try JSON first
obj = extract_json(text)
if obj and ("phi_before" in obj or "phi_after" in obj):
return {
"phi_before": float(obj.get("phi_before", 0)),
"phi_after": float(obj.get("phi_after", 0)),
"reasoning": str(obj.get("reasoning", "")),
"evidence": str(obj.get("evidence", "")),
"confidence": float(obj.get("confidence", 0.5)),
}
# Extract scores from text
phi_before = extract_number(text, "phi_before", 0.0)
if phi_before == 0.0:
phi_before = extract_number(text, "Φ(state_before)", 0.0)
if phi_before == 0.0:
phi_before = extract_number(text, "state_before", 0.0)
phi_after = extract_number(text, "phi_after", 0.0)
if phi_after == 0.0:
phi_after = extract_number(text, "Φ(state_after)", 0.0)
if phi_after == 0.0:
phi_after = extract_number(text, "state_after", 0.0)
# Try SCORE: X pattern
if phi_before == 0.0 and phi_after == 0.0:
scores = re.findall(r'(?:score|SCORE|Score)\s*[=:]\s*([\d.]+)', text)
if len(scores) >= 2:
phi_before = float(scores[0].rstrip('.'))
phi_after = float(scores[1].rstrip('.'))
elif len(scores) == 1:
phi_after = float(scores[0].rstrip('.'))
reasoning = extract_field(text, "reasoning")
evidence = extract_field(text, "evidence")
confidence = extract_number(text, "confidence", 0.5)
if not reasoning:
reasoning = text[:300]
if not evidence:
evidence = text[300:500] if len(text) > 300 else ""
return {
"phi_before": min(10.0, max(0.0, phi_before)),
"phi_after": min(10.0, max(0.0, phi_after)),
"reasoning": reasoning,
"evidence": evidence,
"confidence": min(1.0, max(0.0, confidence)),
}
def parse_optimizer_response(text: str) -> dict[str, Any]:
"""
Parse optimizer output into heuristics list.
"""
obj = extract_json(text)
if obj and "heuristics" in obj:
return obj
# Try to find a JSON array
m = re.search(r'\[.*\]', text, re.DOTALL)
if m:
try:
arr = json.loads(m.group())
if isinstance(arr, list):
return {"heuristics": arr}
except (json.JSONDecodeError, ValueError):
pass
# Extract from text patterns
heuristics = []
patterns = re.findall(r'(?:pattern|when|if)\s*[:\-]\s*(.+?)(?:\n|$)', text, re.IGNORECASE)
strategies = re.findall(r'(?:strategy|do|then|action)\s*[:\-]\s*(.+?)(?:\n|$)', text, re.IGNORECASE)
for pat, strat in zip(patterns, strategies):
heuristics.append({"tier": "strategic", "pattern": pat.strip(), "strategy": strat.strip()})
# If nothing found, try numbered list items
if not heuristics:
items = re.findall(r'\d+\.\s*(.+?)(?:\n|$)', text)
for item in items[:5]:
heuristics.append({"tier": "strategic", "pattern": "General", "strategy": item.strip()})
return {"heuristics": heuristics}