File size: 7,178 Bytes
da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 da5b779 ddfbb09 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | """
Benchmark Runner — Runs all 3 pipelines on HotpotQA and evaluates
==================================================================
Pipeline 1: LLM-Only (no retrieval)
Pipeline 2: Basic RAG (vector search + LLM)
Pipeline 3: GraphRAG (TigerGraph + novelty engine)
Evaluates with: F1, EM, LLM-as-a-Judge, BERTScore, Context Hit Rate
"""
import json
import logging
from typing import Dict, List, Optional
from .layers.orchestration_layer import InferenceOrchestrator
from .layers.evaluation_layer import (
EvaluationLayer, EvalSample, compute_bertscore
)
logger = logging.getLogger(__name__)
class BenchmarkRunner:
"""Runs benchmarks on HotpotQA with all 3 pipelines and generates comparison metrics."""
def __init__(self, orchestrator, evaluator):
self.orchestrator = orchestrator
self.evaluator = evaluator
self.benchmark_results = []
self.eval_samples: List[EvalSample] = []
def run_hotpotqa_benchmark(self, num_samples=100, split="validation",
top_k=5, hops=2, progress_callback=None,
run_judge=True, run_bertscore=True):
"""Run all 3 pipelines on HotpotQA and evaluate."""
from datasets import load_dataset
logger.info(f"Loading HotpotQA ({split}, n={num_samples})...")
ds = load_dataset("hotpotqa/hotpot_qa", "distractor", split=split)
results = []
self.eval_samples = []
for idx in range(min(num_samples, len(ds))):
row = ds[idx]
query, gold = row["question"], row["answer"]
qtype = row.get("type", "unknown")
level = row.get("level", "unknown")
# Build passages from context
passages = [f"{t}: {' '.join(s)}"
for t, s in zip(row["context"]["title"], row["context"]["sentences"])]
# Extract supporting facts for context hit rate
sf = []
for t, si in zip(row["supporting_facts"]["title"], row["supporting_facts"]["sent_id"]):
for ct, cs in zip(row["context"]["title"], row["context"]["sentences"]):
if ct == t and si < len(cs):
sf.append(cs[si])
try:
# Run all 3 pipelines
lo = self.orchestrator.run_llm_only(query)
b = self.orchestrator.run_baseline_rag(query, passages, top_k)
g = self.orchestrator.run_graphrag(query, passages, hops=hops)
sample = EvalSample(
query=query, reference_answer=gold,
llm_only_answer=lo.answer,
baseline_answer=b.answer,
graphrag_answer=g.answer,
baseline_contexts=b.contexts,
graphrag_contexts=g.contexts,
question_type=qtype, difficulty=str(level),
supporting_facts=sf)
self.eval_samples.append(sample)
er = self.evaluator.evaluate_sample(
sample,
llm_only_tokens=lo.total_tokens,
baseline_tokens=b.total_tokens,
graphrag_tokens=g.total_tokens,
llm_only_cost=lo.cost_usd,
baseline_cost=b.cost_usd,
graphrag_cost=g.cost_usd,
llm_only_latency=lo.latency_ms,
baseline_latency=b.latency_ms,
graphrag_latency=g.latency_ms,
run_judge=run_judge,
)
rd = {
"idx": idx, "query": query, "gold_answer": gold,
"question_type": qtype, "level": level,
# Answers
"llm_only_answer": lo.answer,
"baseline_answer": b.answer,
"graphrag_answer": g.answer,
# F1 / EM
"llm_only_f1": er.llm_only_f1,
"baseline_f1": er.baseline_f1,
"graphrag_f1": er.graphrag_f1,
"llm_only_em": er.llm_only_em,
"baseline_em": er.baseline_em,
"graphrag_em": er.graphrag_em,
# LLM-as-Judge
"llm_only_judge": er.llm_only_judge,
"baseline_judge": er.baseline_judge,
"graphrag_judge": er.graphrag_judge,
# Tokens / Cost / Latency
"llm_only_tokens": lo.total_tokens,
"baseline_tokens": b.total_tokens,
"graphrag_tokens": g.total_tokens,
"llm_only_cost": lo.cost_usd,
"baseline_cost": b.cost_usd,
"graphrag_cost": g.cost_usd,
"llm_only_latency": lo.latency_ms,
"baseline_latency": b.latency_ms,
"graphrag_latency": g.latency_ms,
# Context
"baseline_context_hit": er.baseline_context_hit,
"graphrag_context_hit": er.graphrag_context_hit,
"entities_found": len(g.entities_found),
"relations_traversed": len(g.relations_traversed),
}
results.append(rd)
self.benchmark_results.append(rd)
if progress_callback:
progress_callback(idx + 1, num_samples, rd)
if (idx + 1) % 10 == 0:
logger.info(f"Processed {idx + 1}/{num_samples} queries...")
except Exception as e:
logger.error(f"Error on query {idx}: {e}")
# Run BERTScore on full batch (more efficient than per-sample)
bertscore_results = {}
if run_bertscore and self.eval_samples:
logger.info("Computing BERTScore for all pipelines...")
for pipe in ["llm_only", "baseline", "graphrag"]:
try:
bs = self.evaluator.evaluate_bertscore_batch(self.eval_samples, pipeline=pipe)
bertscore_results[pipe] = bs
logger.info(f" {pipe}: mean_f1={bs.get('mean_f1', 0):.4f}, pass_rate={bs.get('pass_rate', 0):.1%}")
except Exception as e:
logger.warning(f" BERTScore for {pipe} failed: {e}")
aggregate = self.evaluator.compute_aggregate_metrics()
report = self.evaluator.generate_report()
return {
"results": results,
"aggregate": aggregate,
"bertscore": bertscore_results,
"report": report,
"num_completed": len(results),
"num_requested": num_samples,
}
def get_results_dataframe(self):
import pandas as pd
return pd.DataFrame(self.benchmark_results) if self.benchmark_results else pd.DataFrame()
def save_results(self, filepath):
with open(filepath, 'w') as f:
json.dump({
"results": self.benchmark_results,
"aggregate": self.evaluator.compute_aggregate_metrics(),
}, f, indent=2, default=str)
|