muthuk1's picture
Fix #9: Update benchmark runner for 3-pipeline comparison + LLM-as-a-Judge + BERTScore evaluation
ddfbb09 verified
raw
history blame
7.18 kB
"""
Benchmark Runner — Runs all 3 pipelines on HotpotQA and evaluates
==================================================================
Pipeline 1: LLM-Only (no retrieval)
Pipeline 2: Basic RAG (vector search + LLM)
Pipeline 3: GraphRAG (TigerGraph + novelty engine)
Evaluates with: F1, EM, LLM-as-a-Judge, BERTScore, Context Hit Rate
"""
import json
import logging
from typing import Dict, List, Optional
from .layers.orchestration_layer import InferenceOrchestrator
from .layers.evaluation_layer import (
EvaluationLayer, EvalSample, compute_bertscore
)
logger = logging.getLogger(__name__)
class BenchmarkRunner:
"""Runs benchmarks on HotpotQA with all 3 pipelines and generates comparison metrics."""
def __init__(self, orchestrator, evaluator):
self.orchestrator = orchestrator
self.evaluator = evaluator
self.benchmark_results = []
self.eval_samples: List[EvalSample] = []
def run_hotpotqa_benchmark(self, num_samples=100, split="validation",
top_k=5, hops=2, progress_callback=None,
run_judge=True, run_bertscore=True):
"""Run all 3 pipelines on HotpotQA and evaluate."""
from datasets import load_dataset
logger.info(f"Loading HotpotQA ({split}, n={num_samples})...")
ds = load_dataset("hotpotqa/hotpot_qa", "distractor", split=split)
results = []
self.eval_samples = []
for idx in range(min(num_samples, len(ds))):
row = ds[idx]
query, gold = row["question"], row["answer"]
qtype = row.get("type", "unknown")
level = row.get("level", "unknown")
# Build passages from context
passages = [f"{t}: {' '.join(s)}"
for t, s in zip(row["context"]["title"], row["context"]["sentences"])]
# Extract supporting facts for context hit rate
sf = []
for t, si in zip(row["supporting_facts"]["title"], row["supporting_facts"]["sent_id"]):
for ct, cs in zip(row["context"]["title"], row["context"]["sentences"]):
if ct == t and si < len(cs):
sf.append(cs[si])
try:
# Run all 3 pipelines
lo = self.orchestrator.run_llm_only(query)
b = self.orchestrator.run_baseline_rag(query, passages, top_k)
g = self.orchestrator.run_graphrag(query, passages, hops=hops)
sample = EvalSample(
query=query, reference_answer=gold,
llm_only_answer=lo.answer,
baseline_answer=b.answer,
graphrag_answer=g.answer,
baseline_contexts=b.contexts,
graphrag_contexts=g.contexts,
question_type=qtype, difficulty=str(level),
supporting_facts=sf)
self.eval_samples.append(sample)
er = self.evaluator.evaluate_sample(
sample,
llm_only_tokens=lo.total_tokens,
baseline_tokens=b.total_tokens,
graphrag_tokens=g.total_tokens,
llm_only_cost=lo.cost_usd,
baseline_cost=b.cost_usd,
graphrag_cost=g.cost_usd,
llm_only_latency=lo.latency_ms,
baseline_latency=b.latency_ms,
graphrag_latency=g.latency_ms,
run_judge=run_judge,
)
rd = {
"idx": idx, "query": query, "gold_answer": gold,
"question_type": qtype, "level": level,
# Answers
"llm_only_answer": lo.answer,
"baseline_answer": b.answer,
"graphrag_answer": g.answer,
# F1 / EM
"llm_only_f1": er.llm_only_f1,
"baseline_f1": er.baseline_f1,
"graphrag_f1": er.graphrag_f1,
"llm_only_em": er.llm_only_em,
"baseline_em": er.baseline_em,
"graphrag_em": er.graphrag_em,
# LLM-as-Judge
"llm_only_judge": er.llm_only_judge,
"baseline_judge": er.baseline_judge,
"graphrag_judge": er.graphrag_judge,
# Tokens / Cost / Latency
"llm_only_tokens": lo.total_tokens,
"baseline_tokens": b.total_tokens,
"graphrag_tokens": g.total_tokens,
"llm_only_cost": lo.cost_usd,
"baseline_cost": b.cost_usd,
"graphrag_cost": g.cost_usd,
"llm_only_latency": lo.latency_ms,
"baseline_latency": b.latency_ms,
"graphrag_latency": g.latency_ms,
# Context
"baseline_context_hit": er.baseline_context_hit,
"graphrag_context_hit": er.graphrag_context_hit,
"entities_found": len(g.entities_found),
"relations_traversed": len(g.relations_traversed),
}
results.append(rd)
self.benchmark_results.append(rd)
if progress_callback:
progress_callback(idx + 1, num_samples, rd)
if (idx + 1) % 10 == 0:
logger.info(f"Processed {idx + 1}/{num_samples} queries...")
except Exception as e:
logger.error(f"Error on query {idx}: {e}")
# Run BERTScore on full batch (more efficient than per-sample)
bertscore_results = {}
if run_bertscore and self.eval_samples:
logger.info("Computing BERTScore for all pipelines...")
for pipe in ["llm_only", "baseline", "graphrag"]:
try:
bs = self.evaluator.evaluate_bertscore_batch(self.eval_samples, pipeline=pipe)
bertscore_results[pipe] = bs
logger.info(f" {pipe}: mean_f1={bs.get('mean_f1', 0):.4f}, pass_rate={bs.get('pass_rate', 0):.1%}")
except Exception as e:
logger.warning(f" BERTScore for {pipe} failed: {e}")
aggregate = self.evaluator.compute_aggregate_metrics()
report = self.evaluator.generate_report()
return {
"results": results,
"aggregate": aggregate,
"bertscore": bertscore_results,
"report": report,
"num_completed": len(results),
"num_requested": num_samples,
}
def get_results_dataframe(self):
import pandas as pd
return pd.DataFrame(self.benchmark_results) if self.benchmark_results else pd.DataFrame()
def save_results(self, filepath):
with open(filepath, 'w') as f:
json.dump({
"results": self.benchmark_results,
"aggregate": self.evaluator.compute_aggregate_metrics(),
}, f, indent=2, default=str)