Spaces:
Sleeping
Sleeping
| # server/analytics_engine.py | |
| """ | |
| Unified Analytics Engine β v4.0 | |
| Aggregates ALL scoring dimensions into a single research-grade report. | |
| Produces: | |
| - Reasoning graph (structured DAG of the agent's decision process) | |
| - Root cause analysis (why the agent failed at every level) | |
| - Decision efficiency score | |
| - Overall AI reliability profile (radar chart data) | |
| - Paper-ready JSON suitable for arXiv submission | |
| This module is the "top of the stack" β it calls all other engines | |
| and synthesizes their outputs into one authoritative report. | |
| """ | |
| from __future__ import annotations | |
| import time | |
| import json | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from dataclasses import dataclass, field | |
| class ReasoningNode: | |
| """One node in the agent's reconstructed reasoning graph.""" | |
| node_id: str | |
| step_number: int | |
| action_type: str | |
| target: Optional[str] # file path or search query | |
| reward: float | |
| was_useful: bool | |
| connected_to: List[str] # IDs of subsequent nodes that built on this | |
| label: str # Human-readable description | |
| class ReasoningGraph: | |
| """ | |
| A directed graph reconstruction of the agent's thought process. | |
| Nodes = actions taken. | |
| Edges = "built on" relationships (e.g., write followed a read = used info from read). | |
| Clusters = logical reasoning phases (Exploration, Hypothesis, Verification, Commit) | |
| """ | |
| nodes: List[ReasoningNode] | |
| phases: Dict[str, List[str]] # phase_name β [node_ids] | |
| critical_path: List[str] # node_ids on the most impactful path | |
| wasted_nodes: List[str] # node_ids that contributed nothing | |
| optimal_path_comparison: Optional[str] # What should the agent have done | |
| def to_dict(self) -> dict: | |
| return { | |
| "nodes": [ | |
| { | |
| "id": n.node_id, "step": n.step_number, | |
| "action": n.action_type, "target": n.target, | |
| "reward": round(n.reward, 3), "useful": n.was_useful, | |
| "connects_to": n.connected_to, "label": n.label, | |
| } | |
| for n in self.nodes | |
| ], | |
| "phases": self.phases, | |
| "critical_path": self.critical_path, | |
| "wasted_nodes": self.wasted_nodes, | |
| "optimal_path": self.optimal_path_comparison, | |
| } | |
| class AnalyticsReport: | |
| """ | |
| The master analytics report β synthesizes all evaluation dimensions. | |
| Paper-ready, structured for research publication or leaderboard submission. | |
| """ | |
| report_id: str | |
| episode_id: str | |
| task: str | |
| variant_id: str | |
| generated_at: float | |
| # Dimension scores (0.0β1.0 each) | |
| correctness_score: float # Did it fix the bug? | |
| causal_score: float # Did it understand WHY? | |
| robustness_score: float # Is the strategy resilient? | |
| calibration_score: float # Was it appropriately confident? | |
| reliability_index: float # Weighted multi-dim score | |
| generalization_hint: float # Based on strategy (robust strategies generalize better) | |
| decision_efficiency: float # Score / Steps ratio (normalized) | |
| process_quality: float # How structured was the reasoning process? | |
| # Composite | |
| composite_score: float # Weighted aggregate of all dimensions | |
| # Graph | |
| reasoning_graph: ReasoningGraph | |
| # Root cause trees | |
| failure_root_causes: List[Dict] # Each: {cause, effect, evidence, depth} | |
| # Alternative path analysis | |
| what_agent_did: List[str] | |
| what_agent_should_have_done: List[str] | |
| steps_wasted: int | |
| steps_optimal: int | |
| # Profile tags | |
| profile_tags: List[str] # e.g., ["OVERCONFIDENT", "SHORTCUT_LEARNER", "WELL_CALIBRATED"] | |
| # Executive summary | |
| executive_summary: str | |
| researcher_notes: str # More technical deep dive | |
| def to_dict(self) -> dict: | |
| return { | |
| "report_id": self.report_id, | |
| "episode_id": self.episode_id, | |
| "task": self.task, | |
| "variant_id": self.variant_id, | |
| "generated_at": self.generated_at, | |
| "dimension_scores": { | |
| "correctness": round(self.correctness_score, 3), | |
| "causal_reasoning": round(self.causal_score, 3), | |
| "robustness": round(self.robustness_score, 3), | |
| "calibration": round(self.calibration_score, 3), | |
| "reliability_index": round(self.reliability_index, 3), | |
| "generalization": round(self.generalization_hint, 3), | |
| "decision_efficiency": round(self.decision_efficiency, 3), | |
| "process_quality": round(self.process_quality, 3), | |
| "composite": round(self.composite_score, 3), | |
| }, | |
| "reasoning_graph": self.reasoning_graph.to_dict(), | |
| "failure_root_causes": self.failure_root_causes, | |
| "alternative_paths": { | |
| "what_agent_did": self.what_agent_did, | |
| "optimal_path": self.what_agent_should_have_done, | |
| "steps_wasted": self.steps_wasted, | |
| "steps_optimal": self.steps_optimal, | |
| }, | |
| "profile_tags": self.profile_tags, | |
| "executive_summary": self.executive_summary, | |
| "researcher_notes": self.researcher_notes, | |
| } | |
| def render_text(self) -> str: | |
| """Render a human-readable analytics report.""" | |
| def bar(v: float, width: int = 20) -> str: | |
| filled = int(v * width) | |
| return "β" * filled + "β" * (width - filled) | |
| lines = [ | |
| f"{'β'*70}", | |
| f" π ANALYTICS ENGINE REPORT β {self.task} | {self.variant_id}", | |
| f" Episode: {self.episode_id}", | |
| f"{'β'*70}", | |
| "", | |
| "ββ DIMENSION SCORES βββββββββββββββββββββββββββββββββββββββββββββββββ", | |
| f"β Correctness [{bar(self.correctness_score)}] {self.correctness_score:.3f}", | |
| f"β Causal Reasoning [{bar(self.causal_score)}] {self.causal_score:.3f}", | |
| f"β Robustness [{bar(self.robustness_score)}] {self.robustness_score:.3f}", | |
| f"β Calibration [{bar(self.calibration_score)}] {self.calibration_score:.3f}", | |
| f"β Reliability [{bar(self.reliability_index)}] {self.reliability_index:.3f}", | |
| f"β Decision Effic. [{bar(self.decision_efficiency)}] {self.decision_efficiency:.3f}", | |
| f"β Process Quality [{bar(self.process_quality)}] {self.process_quality:.3f}", | |
| f"β {'β'*60}", | |
| f"β COMPOSITE [{bar(self.composite_score)}] {self.composite_score:.3f}", | |
| "βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ", | |
| "", | |
| ] | |
| if self.profile_tags: | |
| lines.append(f"π·οΈ Profile: {' | '.join(self.profile_tags)}") | |
| lines.append("") | |
| lines += [ | |
| "π Executive Summary", | |
| f" {self.executive_summary}", | |
| "", | |
| ] | |
| if self.failure_root_causes: | |
| lines.append("π₯ Failure Root Cause Analysis") | |
| for rc in self.failure_root_causes[:3]: | |
| lines.append(f" Cause: {rc.get('cause')}") | |
| lines.append(f" Effect: {rc.get('effect')}") | |
| lines.append(f" Fix: {rc.get('remediation')}") | |
| lines.append("") | |
| lines += [ | |
| "πΊοΈ What Agent Did vs Optimal", | |
| f" Steps taken: {len(self.what_agent_did)} | Steps optimal: {self.steps_optimal} | Wasted: {self.steps_wasted}", | |
| ] | |
| for a, o in zip( | |
| self.what_agent_did[:5], | |
| self.what_agent_should_have_done[:5], | |
| ): | |
| prefix_a = " β" if a == o else " β" | |
| lines.append(f" Agent: {a}") | |
| lines.append(f" Optimal: {o}") | |
| lines.append("") | |
| if self.researcher_notes: | |
| lines += ["π¬ Researcher Notes", f" {self.researcher_notes}", ""] | |
| lines.append(f"{'β'*70}") | |
| return "\n".join(lines) | |
| class AnalyticsEngine: | |
| """ | |
| Master analytics engine β integrates all evaluation modules. | |
| Call .analyze() after an episode to get the full AnalyticsReport. | |
| """ | |
| def analyze( | |
| self, | |
| env, | |
| causal_report=None, | |
| counterfactual_report=None, | |
| calibration_report=None, | |
| advanced_metrics=None, | |
| failure_report=None, | |
| strategy_report=None, | |
| ) -> AnalyticsReport: | |
| """ | |
| Synthesize all evaluation outputs into one AnalyticsReport. | |
| Each sub-report is optional β we gracefully handle None. | |
| """ | |
| import uuid | |
| traj = env.get_trajectory() | |
| steps = traj.get("steps", []) if traj else [] | |
| meta = env.variant.meta if env.variant else {} | |
| episode_id = traj.get("episode_id", "unknown") if traj else "unknown" | |
| variant_id = traj.get("variant_id", "unknown") if traj else "unknown" | |
| task = env.current_task or "unknown" | |
| final_score = env.final_score | |
| files_read = list(env.files_read) | |
| files_written = list(env.files_written) | |
| # ββ Run sub-engines if reports not provided ββββββββββββββββββββββββββββ | |
| if causal_report is None: | |
| from server.causal_probe import CausalProbe | |
| causal_report = CausalProbe().probe( | |
| episode_id, task, steps, meta, files_read, files_written, final_score | |
| ) | |
| if counterfactual_report is None: | |
| from server.counterfactual_engine import CounterfactualEngine | |
| counterfactual_report = CounterfactualEngine().analyze( | |
| episode_id, task, steps, meta, files_read, files_written, final_score | |
| ) | |
| if calibration_report is None: | |
| from server.confidence_calibrator import ConfidenceCalibrator | |
| calibration_report = ConfidenceCalibrator().calibrate( | |
| episode_id, task, steps, final_score | |
| ) | |
| if advanced_metrics is None: | |
| from server.advanced_metrics import AdvancedMetricsEngine | |
| advanced_metrics = AdvancedMetricsEngine().compute( | |
| steps, meta, final_score, files_read, files_written | |
| ) | |
| if failure_report is None: | |
| from server.failure_classifier import FailureClassifier | |
| failure_report = FailureClassifier().classify( | |
| episode_id, task, steps, meta, files_read, files_written, final_score | |
| ) | |
| if strategy_report is None: | |
| from server.strategy_detector import StrategyDetector | |
| strategy_report = StrategyDetector().detect( | |
| steps, task, meta, files_read, final_score | |
| ) | |
| # ββ Compute derived scores βββββββββββββββββββββββββββββββββββββββββββββ | |
| causal_score = causal_report.causal_score | |
| robustness_score = counterfactual_report.robustness_score | |
| calibration_score = calibration_report.calibration_score | |
| reliability_index = advanced_metrics.reliability_index | |
| correctness_score = final_score | |
| # Decision efficiency: correctness per step, normalized | |
| total_steps = max(len(steps), 1) | |
| max_steps_possible = meta.get("max_steps", 20) | |
| decision_efficiency = ( | |
| final_score / | |
| max(1.0, total_steps / max(1, max_steps_possible / 3)) | |
| ) | |
| decision_efficiency = min(1.0, decision_efficiency) | |
| # Process quality: measures structural quality of reasoning process | |
| read_before_write = causal_report.read_before_write | |
| tested_before_submit = causal_report.submit_after_test | |
| used_search = causal_report.search_before_navigate | |
| full_chain = causal_report.actual_chain_coverage | |
| process_quality = ( | |
| (0.25 if read_before_write else 0.0) + | |
| (0.25 if tested_before_submit else 0.0) + | |
| (0.20 if used_search else 0.0) + | |
| full_chain * 0.30 | |
| ) | |
| # Generalization hint from strategy robustness | |
| strategy_generalization_map = { | |
| "TARGETED_DEBUGGING": 0.75, | |
| "SYSTEMATIC_SEARCH": 0.70, | |
| "SPEC_DRIVEN": 0.80, | |
| "BRUTE_FORCE": 0.40, | |
| "RANDOM_EXPLORATION": 0.30, | |
| "MINIMAL_EFFORT": 0.20, | |
| } | |
| generalization_hint = strategy_generalization_map.get(strategy_report.strategy, 0.5) | |
| generalization_hint = (generalization_hint + robustness_score) / 2 | |
| # Composite (research-grade weighted aggregate) | |
| composite_score = ( | |
| correctness_score * 0.30 + | |
| causal_score * 0.20 + | |
| robustness_score * 0.15 + | |
| calibration_score * 0.12 + | |
| reliability_index * 0.10 + | |
| process_quality * 0.08 + | |
| decision_efficiency * 0.05 | |
| ) | |
| # ββ Build reasoning graph ββββββββββββββββββββββββββββββββββββββββββββββ | |
| reasoning_graph = self._build_reasoning_graph(steps, meta, files_read, files_written) | |
| # ββ Root cause analysis ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| root_causes = self._build_root_cause_tree( | |
| failure_report, causal_report, calibration_report, final_score | |
| ) | |
| # ββ Alternative path analysis βββββββββββββββββββββββββββββββββββββββββ | |
| what_did = [ | |
| f"{s.get('action_type')} {s.get('action_path') or s.get('action_query') or ''}".strip() | |
| for s in steps | |
| ] | |
| optimal = self._compute_optimal_path(meta, files_read, files_written, final_score) | |
| steps_wasted = max(0, total_steps - len(optimal)) | |
| # ββ Profile tags βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| tags = [] | |
| if calibration_report.profile.value != "WELL_CALIBRATED": | |
| tags.append(calibration_report.profile.value) | |
| if causal_report.shortcut_learning_detected: | |
| tags.append("SHORTCUT_LEARNER") | |
| if causal_report.false_confidence_detected: | |
| tags.append("FALSE_CONFIDENCE") | |
| if counterfactual_report.brittleness_level.value in ("BRITTLE", "FRAGILE"): | |
| tags.append(f"BRITTLE_STRATEGY_{counterfactual_report.brittleness_level.value}") | |
| if causal_report.understanding_level.value == "DEEP": | |
| tags.append("DEEP_REASONER") | |
| if strategy_report.strategy == "TARGETED_DEBUGGING": | |
| tags.append("TARGETED_DEBUGGER") | |
| if not tags: | |
| tags.append("TYPICAL") | |
| # ββ Executive summary ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| summary_parts = [ | |
| f"Agent scored {final_score:.2f} on {task}.", | |
| f"Causal understanding: {causal_report.understanding_level.value} ({causal_score:.2f}).", | |
| f"Strategy: {strategy_report.strategy} (robustness: {robustness_score:.2f}).", | |
| f"Confidence calibration: {calibration_report.profile.value} (error: {calibration_report.expected_calibration_error:.2f}).", | |
| f"Composite reliability: {composite_score:.2f}.", | |
| ] | |
| executive_summary = " ".join(summary_parts) | |
| # ββ Researcher notes βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| researcher_notes = ( | |
| f"Observed {total_steps} steps ({steps_wasted} wasted vs estimated {len(optimal)} optimal). " | |
| f"Chain coverage: {causal_report.actual_chain_coverage:.0%}. " | |
| f"Chain order score: {causal_report.chain_order_score:.2f}. " | |
| f"Counterfactual mutations survived: {counterfactual_report.mutations_survived}/{len(counterfactual_report.mutations_tested)}. " | |
| f"Expected calibration error: {calibration_report.expected_calibration_error:.3f}. " | |
| f"Decision efficiency: {decision_efficiency:.3f}. " | |
| f"Process quality: {process_quality:.3f}." | |
| ) | |
| return AnalyticsReport( | |
| report_id=f"ar_{uuid.uuid4().hex[:10]}", | |
| episode_id=episode_id, | |
| task=task, | |
| variant_id=variant_id, | |
| generated_at=time.time(), | |
| correctness_score=correctness_score, | |
| causal_score=causal_score, | |
| robustness_score=robustness_score, | |
| calibration_score=calibration_score, | |
| reliability_index=reliability_index, | |
| generalization_hint=generalization_hint, | |
| decision_efficiency=decision_efficiency, | |
| process_quality=process_quality, | |
| composite_score=composite_score, | |
| reasoning_graph=reasoning_graph, | |
| failure_root_causes=root_causes, | |
| what_agent_did=what_did, | |
| what_agent_should_have_done=optimal, | |
| steps_wasted=steps_wasted, | |
| steps_optimal=len(optimal), | |
| profile_tags=tags, | |
| executive_summary=executive_summary, | |
| researcher_notes=researcher_notes, | |
| ) | |
| def _build_reasoning_graph( | |
| self, | |
| steps: List[dict], | |
| meta: dict, | |
| files_read: List[str], | |
| files_written: List[str], | |
| ) -> ReasoningGraph: | |
| """Build a DAG from the trajectory steps.""" | |
| bug_files = set(meta.get("bug_files", []) + meta.get("files_to_implement", [])) | |
| nodes: List[ReasoningNode] = [] | |
| phases: Dict[str, List[str]] = { | |
| "Exploration": [], "Hypothesis": [], "Verification": [], "Commit": [] | |
| } | |
| files_read_set = set() | |
| last_useful_node_id: Optional[str] = None | |
| all_node_ids: List[str] = [] | |
| for s in steps: | |
| node_id = f"n{s.get('step_number', len(nodes)+1)}" | |
| atype = s.get("action_type", "unknown") | |
| target = s.get("action_path") or s.get("action_query") | |
| reward = s.get("reward", 0.0) | |
| # Determine usefulness | |
| was_useful = ( | |
| reward > 0 or | |
| (atype == "read_file" and target in bug_files) or | |
| (atype == "search_code") or | |
| (atype == "run_tests") or | |
| (atype == "submit" and reward > 0) | |
| ) | |
| # Determine phase | |
| if atype in ("read_file", "search_code"): | |
| phase = "Exploration" | |
| elif atype == "write_file": | |
| phase = "Hypothesis" | |
| elif atype == "run_tests": | |
| phase = "Verification" | |
| else: | |
| phase = "Commit" | |
| # Build label | |
| short_target = (target.split("/")[-1] if target else "")[:20] if target else "" | |
| label = f"{atype}({short_target})" if short_target else atype | |
| # Connections: link to previous useful node | |
| connects_to = [last_useful_node_id] if last_useful_node_id and was_useful else [] | |
| connects_to = [c for c in connects_to if c] | |
| node = ReasoningNode( | |
| node_id=node_id, | |
| step_number=s.get("step_number", len(nodes) + 1), | |
| action_type=atype, | |
| target=target, | |
| reward=reward, | |
| was_useful=was_useful, | |
| connected_to=connects_to, | |
| label=label, | |
| ) | |
| nodes.append(node) | |
| phases[phase].append(node_id) | |
| all_node_ids.append(node_id) | |
| if was_useful: | |
| last_useful_node_id = node_id | |
| # Critical path: nodes with positive reward or that led to the final submit | |
| critical_path = [n.node_id for n in nodes if n.reward > 0 or n.action_type == "submit"] | |
| wasted_nodes = [n.node_id for n in nodes if not n.was_useful and n.action_type != "submit"] | |
| # Optimal path comparison | |
| optimal_actions = [] | |
| test_files = [f for f in (list(files_read) + list(bug_files)) if "test" in f.lower()] | |
| src_files = [f for f in (list(files_read) + list(bug_files)) if f not in test_files] | |
| for tf in test_files[:1]: | |
| optimal_actions.append(f"read_file({tf.split('/')[-1]})") | |
| for sf in src_files[:2]: | |
| optimal_actions.append(f"read_file({sf.split('/')[-1]})") | |
| optimal_actions += ["write_file(src)", "run_tests", "submit"] | |
| optimal_path = " β ".join(optimal_actions) | |
| return ReasoningGraph( | |
| nodes=nodes, | |
| phases={k: v for k, v in phases.items() if v}, | |
| critical_path=critical_path, | |
| wasted_nodes=wasted_nodes, | |
| optimal_path_comparison=optimal_path, | |
| ) | |
| def _build_root_cause_tree( | |
| self, failure_report, causal_report, calibration_report, final_score: float | |
| ) -> List[Dict]: | |
| """Build a structured root cause tree.""" | |
| causes = [] | |
| if failure_report and failure_report.failures: | |
| for f in failure_report.failures[:3]: | |
| causes.append({ | |
| "depth": "primary", | |
| "cause": f.failure_type if hasattr(f, "failure_type") else str(f), | |
| "effect": f.evidence if hasattr(f, "evidence") else "unknown", | |
| "remediation": f.remediation if hasattr(f, "remediation") else "See improvement plan", | |
| }) | |
| elif final_score < 0.5: | |
| causes.append({ | |
| "depth": "primary", | |
| "cause": failure_report.primary_failure if failure_report else "LOW_SCORE", | |
| "effect": f"Final score only {final_score:.2f} β bug not adequately fixed", | |
| "remediation": "Use test-first navigation and verify with run_tests", | |
| }) | |
| if causal_report and causal_report.guessing_indicators: | |
| for ind in causal_report.guessing_indicators[:2]: | |
| causes.append({ | |
| "depth": "secondary", | |
| "cause": "CAUSAL_GAP", | |
| "effect": ind, | |
| "remediation": causal_report.recommendations[0] if causal_report.recommendations else "", | |
| }) | |
| if calibration_report and calibration_report.profile.value == "OVERCONFIDENT": | |
| causes.append({ | |
| "depth": "secondary", | |
| "cause": "OVERCONFIDENCE", | |
| "effect": f"Inferred confidence {calibration_report.inferred_confidence:.2f} vs actual {calibration_report.actual_performance:.2f}", | |
| "remediation": "Read more before committing. Verify with tests.", | |
| }) | |
| return causes | |
| def _compute_optimal_path( | |
| self, meta: dict, files_read: List[str], files_written: List[str], score: float | |
| ) -> List[str]: | |
| """Suggest what the optimal action sequence would have been.""" | |
| test_files = [f for f in files_read if "test" in f.lower()] | |
| bug_files = meta.get("bug_files", []) or meta.get("files_to_implement", []) | |
| path = [] | |
| for tf in (test_files or ["tests/test_main.py"])[:1]: | |
| path.append(f"read_file {tf}") | |
| for bf in (bug_files or ["src/main.py"])[:2]: | |
| path.append(f"read_file {bf}") | |
| path.append("search_code <function_name>") | |
| path.append("write_file <targeted_fix>") | |
| path.append("run_tests") | |
| path.append("submit") | |
| return path | |