| """ |
| Benchmark Runner — Runs both pipelines on HotpotQA and evaluates |
| ================================================================= |
| """ |
| import json |
| import logging |
| from typing import Dict, List |
| from .layers.orchestration_layer import InferenceOrchestrator |
| from .layers.evaluation_layer import EvaluationLayer, EvalSample |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class BenchmarkRunner: |
| """Runs benchmarks on HotpotQA and generates comparison metrics.""" |
|
|
| def __init__(self, orchestrator, evaluator): |
| self.orchestrator = orchestrator |
| self.evaluator = evaluator |
| self.benchmark_results = [] |
|
|
| def run_hotpotqa_benchmark(self, num_samples=100, split="validation", |
| top_k=5, hops=2, progress_callback=None): |
| """Run both pipelines on HotpotQA and evaluate.""" |
| from datasets import load_dataset |
| logger.info(f"Loading HotpotQA ({split}, n={num_samples})...") |
| ds = load_dataset("hotpotqa/hotpot_qa", "distractor", split=split) |
|
|
| results = [] |
| for idx in range(min(num_samples, len(ds))): |
| row = ds[idx] |
| query, gold = row["question"], row["answer"] |
| qtype = row.get("type", "unknown") |
| level = row.get("level", "unknown") |
|
|
| |
| passages = [f"{t}: {' '.join(s)}" |
| for t, s in zip(row["context"]["title"], row["context"]["sentences"])] |
|
|
| |
| sf = [] |
| for t, si in zip(row["supporting_facts"]["title"], row["supporting_facts"]["sent_id"]): |
| for ct, cs in zip(row["context"]["title"], row["context"]["sentences"]): |
| if ct == t and si < len(cs): |
| sf.append(cs[si]) |
|
|
| try: |
| comp = self.orchestrator.run_comparison(query, passages, top_k, hops) |
|
|
| sample = EvalSample( |
| query=query, reference_answer=gold, |
| baseline_answer=comp.baseline.answer, |
| graphrag_answer=comp.graphrag.answer, |
| baseline_contexts=comp.baseline.contexts, |
| graphrag_contexts=comp.graphrag.contexts, |
| question_type=qtype, difficulty=str(level), |
| supporting_facts=sf) |
|
|
| er = self.evaluator.evaluate_sample( |
| sample, |
| comp.baseline.total_tokens, comp.graphrag.total_tokens, |
| comp.baseline.cost_usd, comp.graphrag.cost_usd, |
| comp.baseline.latency_ms, comp.graphrag.latency_ms) |
|
|
| rd = { |
| "idx": idx, "query": query, "gold_answer": gold, |
| "question_type": qtype, "level": level, |
| "baseline_answer": comp.baseline.answer, |
| "graphrag_answer": comp.graphrag.answer, |
| "baseline_f1": er.baseline_f1, "graphrag_f1": er.graphrag_f1, |
| "baseline_em": er.baseline_em, "graphrag_em": er.graphrag_em, |
| "baseline_tokens": comp.baseline.total_tokens, |
| "graphrag_tokens": comp.graphrag.total_tokens, |
| "baseline_cost": comp.baseline.cost_usd, |
| "graphrag_cost": comp.graphrag.cost_usd, |
| "baseline_latency": comp.baseline.latency_ms, |
| "graphrag_latency": comp.graphrag.latency_ms, |
| "baseline_context_hit": er.baseline_context_hit, |
| "graphrag_context_hit": er.graphrag_context_hit, |
| "entities_found": len(comp.graphrag.entities_found), |
| "relations_traversed": len(comp.graphrag.relations_traversed), |
| } |
| results.append(rd) |
| self.benchmark_results.append(rd) |
|
|
| if progress_callback: |
| progress_callback(idx + 1, num_samples, rd) |
| if (idx + 1) % 10 == 0: |
| logger.info(f"Processed {idx + 1}/{num_samples} queries...") |
|
|
| except Exception as e: |
| logger.error(f"Error on query {idx}: {e}") |
|
|
| aggregate = self.evaluator.compute_aggregate_metrics() |
| report = self.evaluator.generate_report() |
| return {"results": results, "aggregate": aggregate, "report": report, |
| "num_completed": len(results), "num_requested": num_samples} |
|
|
| def get_results_dataframe(self): |
| import pandas as pd |
| return pd.DataFrame(self.benchmark_results) if self.benchmark_results else pd.DataFrame() |
|
|
| def save_results(self, filepath): |
| with open(filepath, 'w') as f: |
| json.dump({"results": self.benchmark_results, |
| "aggregate": self.evaluator.compute_aggregate_metrics()}, |
| f, indent=2, default=str) |
|
|