adeshboudh16 commited on
Commit
75fe6a6
Β·
1 Parent(s): f57ca63

refactor: simplify phase 2 to sequential row-by-row scoring, comment out batch/sleep/thread code

Browse files
Files changed (1) hide show
  1. scripts/run_eval.py +76 -123
scripts/run_eval.py CHANGED
@@ -34,14 +34,14 @@ Judge provider (Phase 2):
34
  from __future__ import annotations
35
 
36
  import argparse
37
- import asyncio
38
  import json
39
  import math
40
  import os
41
  import sys
42
  import time
43
  import io
44
- from concurrent.futures import ThreadPoolExecutor, as_completed
45
  from datetime import datetime, timezone
46
  from pathlib import Path
47
 
@@ -51,9 +51,9 @@ if sys.stdout.encoding != "utf-8":
51
 
52
  sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
53
 
54
- # ── Rate-limit constants (override via env vars) ───────────────────────────────
55
- BATCH_SIZE = int(os.getenv("BATCH_SIZE", "2")) # 1 row/batch keeps burst ≀5 calls
56
- BATCH_DELAY_SEC = int(os.getenv("BATCH_DELAY_SEC", "60"))
57
  PASS_THRESHOLD = float(os.getenv("PASS_THRESHOLD", "0.7"))
58
  EVAL_LIMIT = int(os.getenv("EVAL_LIMIT", "0")) or None # 0 = no limit
59
 
@@ -164,16 +164,16 @@ def build_judge_pool() -> list[tuple]:
164
  return [build_judge(key1)]
165
 
166
 
167
- def score_batch_in_thread(batch: list[dict], judge_llm, judge_embeddings, label: str = "") -> list[dict]:
168
- """Thread-safe wrapper: gives each worker thread its own asyncio event loop."""
169
- ids = [r["id"] for r in batch]
170
- _log(f"starting {ids}", label)
171
- loop = asyncio.new_event_loop()
172
- asyncio.set_event_loop(loop)
173
- try:
174
- return score_batch(batch, judge_llm, judge_embeddings, label=label)
175
- finally:
176
- loop.close()
177
 
178
 
179
  # ── Dataset helpers ────────────────────────────────────────────────────────────
@@ -277,71 +277,44 @@ def _safe_metric(val, default: float = 0.0) -> float:
277
  return default
278
 
279
 
280
- def score_batch(batch: list[dict], judge_llm, judge_embeddings, label: str = "") -> list[dict]:
 
281
  from ragas.metrics.collections import Faithfulness, AnswerRelevancy, ContextPrecision
282
 
283
- scoreable = [r for r in batch if r["answer"] and r["contexts"]]
284
- skipped = [r for r in batch if not (r["answer"] and r["contexts"])]
285
-
286
- ids = [r["id"] for r in scoreable]
287
- scored = []
288
- if scoreable:
289
- # Collections metrics use their own .batch_score() API, not ragas.evaluate().
290
- # evaluate() only works with ragas.metrics.Metric subclasses; these inherit
291
- # from ragas.metrics.collections.base.BaseMetric which is a different hierarchy.
292
- f_metric = Faithfulness(llm=judge_llm)
293
- ar_metric = AnswerRelevancy(llm=judge_llm, embeddings=judge_embeddings)
294
- cp_metric = ContextPrecision(llm=judge_llm)
295
-
296
- # Sleep BATCH_DELAY_SEC between each metric so the per-minute quota resets.
297
- # abatch_score fires all rows concurrently (asyncio.gather), so even with
298
- # BATCH_SIZE=1 we get ~2-5 calls per metric. Sleeping 60 s between metrics
299
- # keeps us well under Gemini free tier's 15 RPM cap.
300
- _log(f"faithfulness β†’ scoring {ids}", label)
301
- t0 = time.perf_counter()
302
- f_results = f_metric.batch_score([
303
- {"user_input": r["query"], "response": r["answer"], "retrieved_contexts": r["contexts"]}
304
- for r in scoreable
305
- ])
306
- _log(f"faithfulness βœ“ done ({time.perf_counter() - t0:.1f}s)", label)
307
- # _sleep_log(BATCH_DELAY_SEC, label) # rate-limit delay β€” disabled for osmapi
308
-
309
- _log(f"answer_relevancy β†’ scoring {ids}", label)
310
- t0 = time.perf_counter()
311
- ar_results = ar_metric.batch_score([
312
- {"user_input": r["query"], "response": r["answer"]}
313
- for r in scoreable
314
- ])
315
- _log(f"answer_relevancy βœ“ done ({time.perf_counter() - t0:.1f}s)", label)
316
- # _sleep_log(BATCH_DELAY_SEC, label) # rate-limit delay β€” disabled for osmapi
317
-
318
- _log(f"context_precision β†’ scoring {ids}", label)
319
- t0 = time.perf_counter()
320
- cp_results = cp_metric.batch_score([
321
- {"user_input": r["query"], "reference": r["ground_truth"], "retrieved_contexts": r["contexts"]}
322
- for r in scoreable
323
- ])
324
- _log(f"context_precision βœ“ done ({time.perf_counter() - t0:.1f}s)", label)
325
-
326
- for row, f_r, ar_r, cp_r in zip(scoreable, f_results, ar_results, cp_results):
327
- row = dict(row)
328
- row["faithfulness"] = round(_safe_metric(f_r.value), 3)
329
- row["answer_relevancy"] = round(_safe_metric(ar_r.value), 3)
330
- row["context_precision"] = round(_safe_metric(cp_r.value), 3)
331
- row["pass"] = (
332
- row["faithfulness"] >= PASS_THRESHOLD
333
- and row["answer_relevancy"] >= PASS_THRESHOLD
334
- and row["context_precision"] >= PASS_THRESHOLD
335
- )
336
- scored.append(row)
337
-
338
- for row in skipped:
339
- row = dict(row)
340
  row["faithfulness"] = row["answer_relevancy"] = row["context_precision"] = 0.0
341
  row["pass"] = False
342
- scored.append(row)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
343
 
344
- return scored
 
 
345
 
346
 
347
  def compute_group_stats(rows: list[dict], key: str) -> dict:
@@ -394,60 +367,41 @@ def print_summary(all_rows: list[dict]) -> None:
394
 
395
 
396
  def run_phase2(invoked: list[dict]) -> list[dict]:
397
- print(f"\nPhase 2: RAGAS scoring {len(invoked)} rows in batches of {BATCH_SIZE}...")
398
- judges = build_judge_pool()
399
- num_workers = len(judges)
400
  all_scored: list[dict] = []
401
 
402
- batches = [invoked[i:i + BATCH_SIZE] for i in range(0, len(invoked), BATCH_SIZE)]
403
-
404
- if num_workers == 1:
405
- judge_llm, judge_embeddings = judges[0]
406
- for batch_num, batch in enumerate(batches, 1):
407
- ids = [r["id"] for r in batch]
408
- _log(f"Batch {batch_num}/{len(batches)}: {ids}")
409
- scored = score_batch(batch, judge_llm, judge_embeddings, label=f"B{batch_num}")
410
- all_scored.extend(scored)
411
- _log(f"Batch {batch_num}/{len(batches)} complete")
412
- if batch_num < len(batches):
413
- _sleep_log(BATCH_DELAY_SEC)
414
- else:
415
- # Two workers: interleave batches across keys (worker 0 = even, worker 1 = odd).
416
- # Each thread runs its own asyncio event loop to avoid cross-thread conflicts.
417
- futures: dict = {}
418
- with ThreadPoolExecutor(max_workers=2) as executor:
419
- for i, batch in enumerate(batches):
420
- judge_llm, judge_emb = judges[i % 2]
421
- ids = [r["id"] for r in batch]
422
- worker_label = f"W{i % 2 + 1}-B{i + 1}"
423
- _log(f"Submitting batch {i + 1}/{len(batches)}: {ids} β†’ worker {i % 2 + 1}")
424
- f = executor.submit(score_batch_in_thread, batch, judge_llm, judge_emb, worker_label)
425
- futures[f] = i
426
-
427
- results: dict[int, list[dict]] = {}
428
- for f in as_completed(futures):
429
- idx = futures[f]
430
- try:
431
- results[idx] = f.result(timeout=300) # 5-min cap per batch
432
- except Exception as exc:
433
- _log(f"Batch {idx + 1} FAILED ({type(exc).__name__}: {exc}) β€” skipping with zeros")
434
- results[idx] = []
435
- else:
436
- _log(f"Batch {idx + 1} complete")
437
-
438
- for i in sorted(results):
439
- all_scored.extend(results[i])
440
 
441
  print_summary(all_scored)
442
 
443
  jurisdictions = sorted({r["jurisdiction"] or "MULTI" for r in all_scored})
444
  query_types = sorted({r["query_type"] for r in all_scored})
445
  report = {
446
- "run_at": datetime.now(timezone.utc).isoformat(),
447
- "dataset_size": len(all_scored),
448
- "batch_size": BATCH_SIZE,
449
- "batch_delay_sec": BATCH_DELAY_SEC,
450
- "pass_threshold": PASS_THRESHOLD,
451
  "overall": compute_group_stats(all_scored, "overall"),
452
  "by_jurisdiction": {
453
  jur: compute_group_stats([r for r in all_scored if (r["jurisdiction"] or "MULTI") == jur], jur)
@@ -482,8 +436,7 @@ def main() -> None:
482
  if EVAL_LIMIT:
483
  rows = rows[:EVAL_LIMIT]
484
 
485
- print(f"CivicSetu RAGAS Eval β€” {len(rows)} queries | "
486
- f"batch_size={BATCH_SIZE} delay={BATCH_DELAY_SEC}s threshold={PASS_THRESHOLD}")
487
 
488
  if args.phase == 1:
489
  run_phase1(rows)
 
34
  from __future__ import annotations
35
 
36
  import argparse
37
+ # import asyncio # not needed β€” sequential mode
38
  import json
39
  import math
40
  import os
41
  import sys
42
  import time
43
  import io
44
+ # from concurrent.futures import ThreadPoolExecutor, as_completed # not needed β€” sequential mode
45
  from datetime import datetime, timezone
46
  from pathlib import Path
47
 
 
51
 
52
  sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
53
 
54
+ # ── Constants ──────────────────────────────────────────────────────────────────
55
+ # BATCH_SIZE = int(os.getenv("BATCH_SIZE", "2")) # commented out β€” sequential mode
56
+ # BATCH_DELAY_SEC = int(os.getenv("BATCH_DELAY_SEC", "60")) # commented out β€” no rate-limit sleep
57
  PASS_THRESHOLD = float(os.getenv("PASS_THRESHOLD", "0.7"))
58
  EVAL_LIMIT = int(os.getenv("EVAL_LIMIT", "0")) or None # 0 = no limit
59
 
 
164
  return [build_judge(key1)]
165
 
166
 
167
+ # def score_batch_in_thread(batch, judge_llm, judge_embeddings, label=""):
168
+ # """Commented out β€” was used for parallel dual-worker mode."""
169
+ # ids = [r["id"] for r in batch]
170
+ # _log(f"starting {ids}", label)
171
+ # loop = asyncio.new_event_loop()
172
+ # asyncio.set_event_loop(loop)
173
+ # try:
174
+ # return score_batch(batch, judge_llm, judge_embeddings, label=label)
175
+ # finally:
176
+ # loop.close()
177
 
178
 
179
  # ── Dataset helpers ────────────────────────────────────────────────────────────
 
277
  return default
278
 
279
 
280
+ def score_row(row: dict, judge_llm, judge_embeddings) -> dict:
281
+ """Score a single row with all three RAGAS metrics. Simple sequential API calls."""
282
  from ragas.metrics.collections import Faithfulness, AnswerRelevancy, ContextPrecision
283
 
284
+ row = dict(row)
285
+
286
+ if not row["answer"] or not row["contexts"]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
287
  row["faithfulness"] = row["answer_relevancy"] = row["context_precision"] = 0.0
288
  row["pass"] = False
289
+ return row
290
+
291
+ f_metric = Faithfulness(llm=judge_llm)
292
+ ar_metric = AnswerRelevancy(llm=judge_llm, embeddings=judge_embeddings)
293
+ cp_metric = ContextPrecision(llm=judge_llm)
294
+
295
+ f_results = f_metric.batch_score([
296
+ {"user_input": row["query"], "response": row["answer"], "retrieved_contexts": row["contexts"]}
297
+ ])
298
+ ar_results = ar_metric.batch_score([
299
+ {"user_input": row["query"], "response": row["answer"]}
300
+ ])
301
+ cp_results = cp_metric.batch_score([
302
+ {"user_input": row["query"], "reference": row["ground_truth"], "retrieved_contexts": row["contexts"]}
303
+ ])
304
+
305
+ row["faithfulness"] = round(_safe_metric(f_results[0].value), 3)
306
+ row["answer_relevancy"] = round(_safe_metric(ar_results[0].value), 3)
307
+ row["context_precision"] = round(_safe_metric(cp_results[0].value), 3)
308
+ row["pass"] = (
309
+ row["faithfulness"] >= PASS_THRESHOLD
310
+ and row["answer_relevancy"] >= PASS_THRESHOLD
311
+ and row["context_precision"] >= PASS_THRESHOLD
312
+ )
313
+ return row
314
 
315
+ # def score_batch(batch, judge_llm, judge_embeddings, label=""):
316
+ # """Commented out β€” was the batched+sleep scoring path for Gemini free-tier rate limits."""
317
+ # ... (see git history)
318
 
319
 
320
  def compute_group_stats(rows: list[dict], key: str) -> dict:
 
367
 
368
 
369
  def run_phase2(invoked: list[dict]) -> list[dict]:
370
+ print(f"\nPhase 2: RAGAS scoring {len(invoked)} rows (sequential)...")
371
+ judge_llm, judge_embeddings = build_judge_pool()[0]
 
372
  all_scored: list[dict] = []
373
 
374
+ for i, row in enumerate(invoked, 1):
375
+ _log(f"[{i:02}/{len(invoked)}] {row['id']}")
376
+ try:
377
+ scored = score_row(row, judge_llm, judge_embeddings)
378
+ except Exception as exc:
379
+ _log(f" FAILED ({type(exc).__name__}: {exc}) β€” skipping with zeros")
380
+ scored = dict(row)
381
+ scored["faithfulness"] = scored["answer_relevancy"] = scored["context_precision"] = 0.0
382
+ scored["pass"] = False
383
+ all_scored.append(scored)
384
+ _log(f" faith={scored['faithfulness']:.2f} rel={scored['answer_relevancy']:.2f} prec={scored['context_precision']:.2f} {'PASS' if scored['pass'] else 'fail'}")
385
+
386
+ # ── Commented out: batched + parallel + rate-limit-sleep mode ──────────────
387
+ # batches = [invoked[i:i + BATCH_SIZE] for i in range(0, len(invoked), BATCH_SIZE)]
388
+ # if num_workers == 1:
389
+ # for batch_num, batch in enumerate(batches, 1):
390
+ # scored = score_batch(batch, judge_llm, judge_embeddings, label=f"B{batch_num}")
391
+ # all_scored.extend(scored)
392
+ # _sleep_log(BATCH_DELAY_SEC)
393
+ # else: # ThreadPoolExecutor dual-worker path β€” see git history
394
+ # ───────────────────────────────────────────────────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
395
 
396
  print_summary(all_scored)
397
 
398
  jurisdictions = sorted({r["jurisdiction"] or "MULTI" for r in all_scored})
399
  query_types = sorted({r["query_type"] for r in all_scored})
400
  report = {
401
+ "run_at": datetime.now(timezone.utc).isoformat(),
402
+ "dataset_size": len(all_scored),
403
+ "mode": "sequential",
404
+ "pass_threshold": PASS_THRESHOLD,
 
405
  "overall": compute_group_stats(all_scored, "overall"),
406
  "by_jurisdiction": {
407
  jur: compute_group_stats([r for r in all_scored if (r["jurisdiction"] or "MULTI") == jur], jur)
 
436
  if EVAL_LIMIT:
437
  rows = rows[:EVAL_LIMIT]
438
 
439
+ print(f"CivicSetu RAGAS Eval β€” {len(rows)} queries | sequential threshold={PASS_THRESHOLD}")
 
440
 
441
  if args.phase == 1:
442
  run_phase1(rows)