EntropyEnv / server /benchmark_store.py
immortalindeed's picture
Fix UI leaderboard fields and commit uv.lock
bdb64b6
# server/benchmark_store.py
# Persists benchmark results to disk so they survive server restarts.
# Used by both inference.py (CLI) and web_ui.py (frontend).
import json
import os
from datetime import datetime
from typing import List, Dict
_STORE_PATH = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'results', 'run_history.json'
)
os.makedirs(os.path.dirname(_STORE_PATH), exist_ok=True)
def _load() -> List[Dict]:
"""Load all benchmark results from disk."""
if not os.path.exists(_STORE_PATH):
return []
try:
with open(_STORE_PATH, 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, list) else []
except (json.JSONDecodeError, IOError):
return []
def _save(results: List[Dict]) -> None:
"""Save all benchmark results to disk."""
try:
with open(_STORE_PATH, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, default=str)
except IOError as e:
print(f"[benchmark_store] WARNING: Could not save results: {e}")
def append_result(model: str, model_id: str, scores: Dict[str, float]) -> Dict:
"""Add a new benchmark result and persist to disk. Returns the saved entry."""
avg = round(sum(scores.values()) / max(len(scores), 1), 4)
entry = {
'model_name': model,
'model_id': model_id,
'scores': scores,
'average': avg,
'type': 'full_run',
'timestamp': datetime.utcnow().isoformat(),
}
results = _load()
results.append(entry)
_save(results)
return entry
def get_all() -> List[Dict]:
"""Return all benchmark results, newest first."""
results = _load()
for r in results:
if 'average' not in r and 'avg' in r:
r['average'] = r['avg']
if 'model_name' not in r and 'model' in r:
r['model_name'] = r['model']
return sorted(results, key=lambda x: x.get('timestamp', ''), reverse=True)
def get_leaderboard() -> List[Dict]:
"""Return deduplicated leaderboard: best score per model_id."""
results = _load()
best: Dict[str, Dict] = {}
for r in results:
mid = r.get('model_id', r.get('model_name', r.get('model', 'unknown')))
val = r.get('average', r.get('avg', 0))
best_val = best[mid].get('average', best[mid].get('avg', 0)) if mid in best else -1
if mid not in best or val > best_val:
best[mid] = r
return sorted(best.values(), key=lambda x: x.get('average', x.get('avg', 0)), reverse=True)