| """ |
| robust_parser.py — Universal LLM output parser that never requires JSON. |
| |
| The problem: LLMs are unreliable at producing valid JSON. Different models |
| format differently. Structured output (json_schema) isn't supported everywhere. |
| |
| The solution: Parse whatever the LLM gives you. Extract fields by multiple |
| strategies, fall back gracefully, and always return something usable. |
| |
| This replaces the fragile generate_structured → json.loads → crash pattern. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import re |
| import logging |
| from typing import Any |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def extract_json(text: str) -> dict[str, Any] | None: |
| """ |
| Try to extract a JSON object from arbitrary LLM text. |
| Handles: pure JSON, JSON in code blocks, JSON embedded in prose. |
| Returns None if no valid JSON found. |
| """ |
| text = text.strip() |
|
|
| |
| try: |
| return json.loads(text) |
| except (json.JSONDecodeError, ValueError): |
| pass |
|
|
| |
| m = re.search(r'```(?:json)?\s*(\{.*\})\s*```', text, re.DOTALL) |
| if m: |
| try: |
| return json.loads(m.group(1)) |
| except (json.JSONDecodeError, ValueError): |
| pass |
|
|
| |
| start = text.find('{') |
| if start >= 0: |
| depth = 0 |
| for i in range(start, len(text)): |
| if text[i] == '{': |
| depth += 1 |
| elif text[i] == '}': |
| depth -= 1 |
| if depth == 0: |
| try: |
| return json.loads(text[start:i + 1]) |
| except (json.JSONDecodeError, ValueError): |
| break |
|
|
| return None |
|
|
|
|
| def extract_field(text: str, field_name: str, default: str = "") -> str: |
| """ |
| Extract a named field value from LLM text, regardless of format. |
| |
| Handles: |
| - JSON: {"field": "value"} |
| - Markdown: **field:** value / field: value |
| - Labeled: FIELD: value |
| - Line-based: field\nvalue |
| """ |
| text_lower = text.lower() |
| name_lower = field_name.lower() |
|
|
| |
| obj = extract_json(text) |
| if obj and field_name in obj: |
| return str(obj[field_name]) |
|
|
| |
| patterns = [ |
| rf'"{field_name}"\s*:\s*"((?:[^"\\]|\\.)*)"', |
| rf'"{field_name}"\s*:\s*(\d+\.?\d*)', |
| rf'\*?\*?{field_name}\*?\*?\s*:\s*(.+?)(?:\n|$)', |
| rf'{field_name}\s*[=:]\s*(.+?)(?:\n|$)', |
| ] |
| for pattern in patterns: |
| m = re.search(pattern, text, re.IGNORECASE) |
| if m: |
| return m.group(1).strip().strip('"').strip("'") |
|
|
| return default |
|
|
|
|
| def extract_number(text: str, field_name: str, default: float = 0.0) -> float: |
| """Extract a numeric field from LLM text.""" |
| val = extract_field(text, field_name) |
| if val: |
| try: |
| return float(val.rstrip('.').rstrip(',')) |
| except (ValueError, TypeError): |
| pass |
|
|
| |
| m = re.search(rf'{field_name}\s*[=:]\s*([\d.]+)', text, re.IGNORECASE) |
| if m: |
| try: |
| return float(m.group(1).rstrip('.')) |
| except ValueError: |
| pass |
|
|
| return default |
|
|
|
|
| def extract_code(text: str) -> str: |
| """ |
| Extract Python code from LLM text. |
| |
| Handles: |
| - Code in ``` blocks |
| - Code in "code" JSON field |
| - Raw code with def/class keywords |
| """ |
| |
| obj = extract_json(text) |
| if obj: |
| |
| action = obj.get("action", {}) |
| if isinstance(action, dict): |
| params = action.get("params", {}) |
| if isinstance(params, dict) and "code" in params: |
| return params["code"] |
| if "code" in obj: |
| return obj["code"] |
|
|
| |
| m = re.search(r'```(?:python)?\s*\n(.*?)```', text, re.DOTALL) |
| if m: |
| return m.group(1).strip() |
|
|
| |
| lines = text.split('\n') |
| code_lines = [] |
| in_code = False |
| for line in lines: |
| if re.match(r'^(def |class |import |from )', line.strip()): |
| in_code = True |
| if in_code: |
| |
| if line.strip() == '' and code_lines and not code_lines[-1].strip().endswith(':'): |
| |
| code_lines.append(line) |
| elif in_code and (line.startswith(' ') or line.startswith('\t') or |
| re.match(r'^(def |class |import |from |#|$)', line.strip())): |
| code_lines.append(line) |
| elif re.match(r'^(def |class )', line.strip()): |
| code_lines.append(line) |
| else: |
| if code_lines: |
| break |
|
|
| if code_lines: |
| return '\n'.join(code_lines).strip() |
|
|
| return "" |
|
|
|
|
| def parse_actor_response(text: str) -> dict[str, Any]: |
| """ |
| Parse an actor's response into thought/action/expected_delta. |
| Works with any format the LLM produces. |
| """ |
| |
| obj = extract_json(text) |
| if obj and ("action" in obj or "thought" in obj): |
| return obj |
|
|
| |
| thought = extract_field(text, "thought") |
| expected_delta = extract_field(text, "expected_delta") |
|
|
| |
| action_name = extract_field(text, "name", "") |
| if not action_name: |
| action_name = extract_field(text, "action", "") |
| if action_name and action_name.startswith("{"): |
| action_name = "" |
|
|
| |
| code = extract_code(text) |
|
|
| |
| action = {"name": action_name or "UNKNOWN", "params": {}} |
| if code: |
| action["name"] = action.get("name", "submit_code") if action["name"] == "UNKNOWN" else action["name"] |
| action["params"]["code"] = code |
|
|
| if not thought: |
| |
| thought = text.split('\n')[0][:200] if text else "" |
|
|
| return { |
| "thought": thought, |
| "action": action, |
| "expected_delta": expected_delta or "", |
| } |
|
|
|
|
| def parse_critic_response(text: str) -> dict[str, Any]: |
| """ |
| Parse a critic's response into phi_before/phi_after/reasoning/evidence/confidence. |
| Works with any format. |
| """ |
| |
| obj = extract_json(text) |
| if obj and ("phi_before" in obj or "phi_after" in obj): |
| return { |
| "phi_before": float(obj.get("phi_before", 0)), |
| "phi_after": float(obj.get("phi_after", 0)), |
| "reasoning": str(obj.get("reasoning", "")), |
| "evidence": str(obj.get("evidence", "")), |
| "confidence": float(obj.get("confidence", 0.5)), |
| } |
|
|
| |
| phi_before = extract_number(text, "phi_before", 0.0) |
| if phi_before == 0.0: |
| phi_before = extract_number(text, "Φ(state_before)", 0.0) |
| if phi_before == 0.0: |
| phi_before = extract_number(text, "state_before", 0.0) |
|
|
| phi_after = extract_number(text, "phi_after", 0.0) |
| if phi_after == 0.0: |
| phi_after = extract_number(text, "Φ(state_after)", 0.0) |
| if phi_after == 0.0: |
| phi_after = extract_number(text, "state_after", 0.0) |
|
|
| |
| if phi_before == 0.0 and phi_after == 0.0: |
| scores = re.findall(r'(?:score|SCORE|Score)\s*[=:]\s*([\d.]+)', text) |
| if len(scores) >= 2: |
| phi_before = float(scores[0].rstrip('.')) |
| phi_after = float(scores[1].rstrip('.')) |
| elif len(scores) == 1: |
| phi_after = float(scores[0].rstrip('.')) |
|
|
| reasoning = extract_field(text, "reasoning") |
| evidence = extract_field(text, "evidence") |
| confidence = extract_number(text, "confidence", 0.5) |
|
|
| if not reasoning: |
| reasoning = text[:300] |
| if not evidence: |
| evidence = text[300:500] if len(text) > 300 else "" |
|
|
| return { |
| "phi_before": min(10.0, max(0.0, phi_before)), |
| "phi_after": min(10.0, max(0.0, phi_after)), |
| "reasoning": reasoning, |
| "evidence": evidence, |
| "confidence": min(1.0, max(0.0, confidence)), |
| } |
|
|
|
|
| def parse_optimizer_response(text: str) -> dict[str, Any]: |
| """ |
| Parse optimizer output into heuristics list. |
| """ |
| obj = extract_json(text) |
| if obj and "heuristics" in obj: |
| return obj |
|
|
| |
| m = re.search(r'\[.*\]', text, re.DOTALL) |
| if m: |
| try: |
| arr = json.loads(m.group()) |
| if isinstance(arr, list): |
| return {"heuristics": arr} |
| except (json.JSONDecodeError, ValueError): |
| pass |
|
|
| |
| heuristics = [] |
| patterns = re.findall(r'(?:pattern|when|if)\s*[:\-]\s*(.+?)(?:\n|$)', text, re.IGNORECASE) |
| strategies = re.findall(r'(?:strategy|do|then|action)\s*[:\-]\s*(.+?)(?:\n|$)', text, re.IGNORECASE) |
|
|
| for pat, strat in zip(patterns, strategies): |
| heuristics.append({"tier": "strategic", "pattern": pat.strip(), "strategy": strat.strip()}) |
|
|
| |
| if not heuristics: |
| items = re.findall(r'\d+\.\s*(.+?)(?:\n|$)', text) |
| for item in items[:5]: |
| heuristics.append({"tier": "strategic", "pattern": "General", "strategy": item.strip()}) |
|
|
| return {"heuristics": heuristics} |
|
|