Elliot89's picture
Update graders.py
9aa146b verified
"""
graders.py β€” Deterministic graders for all 3 Cloud Incident Response tasks.
Public API:
grade(task_id, state, scenario) -> {"total": float, "breakdown": dict, "feedback": str}
All scores are in [0.0, 1.0].
Grading Philosophy:
- Easy task: binary-ish β€” did you get the severity right?
- Medium task: partial credit for correct service, bonus for efficiency
- Hard task: multi-component β€” base + efficiency βˆ’ penalties + summary quality
"""
from __future__ import annotations
def _normalise(s: str) -> str:
"""Lowercase, strip whitespace, collapse hyphens/underscores."""
return s.lower().strip().replace("_", "-").replace(" ", "-")
def _svc_match(submitted: str, correct: str) -> bool:
s = _normalise(submitted)
c = _normalise(correct)
if s == c:
return True
if s in c or c in s:
return True
aliases = {
"network": "network-infra",
"network-infrastructure": "network-infra",
"cdn": "cdn-edge",
"postgres": "postgres-db",
"postgresql": "postgres-db",
"analytics": "analytics-service",
"payment": "payment-service",
"auth": "auth-service",
"api": "api-gateway",
"api-gw": "api-gateway",
"fraud": "fraud-detection-service",
"fraud-detection": "fraud-detection-service",
"order": "order-service",
"orders": "order-service",
"image": "image-service",
"images": "image-service",
"product": "product-service",
"products": "product-service",
"redis": "redis-session",
"redis-cache": "redis-payment-cache",
}
return aliases.get(s, s) == c or s == aliases.get(c, c)
def grade(task_id: str, state: dict, scenario: dict) -> dict:
_graders = {
"alert_classification": _grade_alert_classification,
"root_cause_analysis": _grade_root_cause_analysis,
"remediation_planning": _grade_remediation_planning,
}
fn = _graders.get(task_id)
if fn is None:
return {
"total": 0.01, # ← was 0.0
"breakdown": {},
"feedback": f"Unknown task_id '{task_id}'",
}
result = fn(state, scenario)
# Clamp to open interval (0, 1) β€” validator requires strictly between 0 and 1
result["total"] = max(0.01, min(0.99, result["total"]))
return result
# ── Task 1: Alert Classification (Easy) ──────────────────────────────────────
#
# Scoring:
# 1.0 β€” exact severity match
# 0.5 β€” adjacent severity (e.g. P1 vs P2)
# 0.25 β€” two levels off (e.g. P1 vs P3)
# 0.0 β€” wrong by 3+ levels or no submission
#
# This is genuinely EASY: with 3 steps, an agent queries 1–2 services,
# reads the error_rate + revenue_impact, and classifies. The data is
# unambiguous β€” the correct answer is clearly derivable from the alert.
def _grade_alert_classification(state: dict, scenario: dict) -> dict:
history = state.get("action_history", [])
correct = scenario.get("correct_severity", "P1")
adjacent = scenario.get("adjacent_severities", [])
order = ["P1", "P2", "P3", "P4"]
submitted = None
for a in history:
if a.get("action_type") == "submit_severity":
submitted = a.get("parameters", {}).get("severity", "").upper().strip()
break
if not submitted:
return {
"total": 0.0,
"breakdown": {"submitted": False, "severity_match": 0.0},
"feedback": "No severity submitted β€” score 0.0",
}
if submitted == correct:
score, msg = 1.0, f"Exact match: {submitted}"
elif submitted in adjacent:
score, msg = 0.5, f"Adjacent: submitted {submitted}, correct {correct}"
else:
try:
dist = abs(order.index(submitted) - order.index(correct))
except ValueError:
dist = 4
score = 0.25 if dist == 2 else 0.0
msg = f"Wrong: submitted {submitted}, correct {correct} (distance={dist})"
return {
"total": score,
"breakdown": {
"submitted_severity": submitted,
"correct_severity": correct,
"severity_match": score,
},
"feedback": msg,
}
# ── Task 2: Root Cause Analysis (Medium) ─────────────────────────────────────
#
# Scoring (total up to 1.0):
# Base (up to 0.6):
# 0.60 β€” correct service AND failure mode keywords match
# 0.35 β€” correct service only
# 0.10 β€” wrong service (partial credit for at least submitting)
# Efficiency bonus (up to 0.4):
# Based on investigation precision: queried relevant services / total queries
# Plus bonus for breadth of investigation (up to 3 unique known services)
#
# This is genuinely MEDIUM: the root cause is NOT in the alert's
# affected_services list. The agent must investigate services outside
# the blast radius, correlate log evidence, and identify the upstream
# trigger β€” this requires multi-hop reasoning across 4–6 services.
def _grade_root_cause_analysis(state: dict, scenario: dict) -> dict:
history = state.get("action_history", [])
correct_rc = scenario.get("correct_root_cause", {})
correct_svc = correct_rc.get("service", "").lower().strip()
correct_mode = correct_rc.get("failure_mode", "").lower().strip()
known = {s.lower() for s in scenario.get("known_services", set())}
diag_types = {
"query_logs",
"check_metrics",
"check_dependencies",
"check_recent_deploys",
"check_service_status",
}
sub_svc, sub_mode, sub_step = "", "", len(history)
for a in history:
if a.get("action_type") == "submit_root_cause":
p = a.get("parameters", {})
sub_svc = p.get("service", "").lower().strip()
sub_mode = p.get("failure_mode", "").lower().strip()
sub_step = a.get("step", len(history))
break
if not sub_svc:
return {
"total": 0.0,
"breakdown": {"base": 0.0, "efficiency": 0.0, "submitted": False},
"feedback": "No root cause submitted β€” score 0.0",
}
svc_match = _svc_match(sub_svc, correct_svc)
mode_kws = [w for w in correct_mode.split() if len(w) > 3]
mode_match = svc_match and (
any(kw in sub_mode for kw in mode_kws) if mode_kws else True
)
if mode_match:
base, base_fb = 0.6, "Correct service + failure mode"
elif svc_match:
base, base_fb = 0.35, "Correct service only β€” failure mode unclear"
else:
# Give MORE partial credit for investigation effort even with wrong answer
pre_submit_diag = [
a for a in history[:sub_step]
if a.get("action_type") in diag_types
]
investigated = len({
a.get("parameters", {}).get("service", "").lower()
for a in pre_submit_diag
} - {""})
# 0.05 base + up to 0.15 for investigating 3+ services
wrong_base = min(0.20, 0.05 + investigated * 0.05)
base, base_fb = wrong_base, (
f"Wrong service: '{sub_svc}' (correct: '{correct_svc}') β€” "
f"investigated {investigated} services"
)
efficiency = 0.0
if svc_match:
pre_submit = [
a
for a in history[: sub_step]
if a.get("action_type") in diag_types
]
queried_svcs = {
a.get("parameters", {}).get("service", "").lower() for a in pre_submit
}
relevant = queried_svcs & known
total_q = len(pre_submit)
if total_q > 0:
precision = len(relevant) / max(total_q, 1)
efficiency = round(
min(0.4, precision * 0.4 + min(len(relevant), 3) * 0.05), 4
)
total = round(min(1.0, base + efficiency), 4)
return {
"total": total,
"breakdown": {
"base": base,
"efficiency_bonus": efficiency,
"service_match": svc_match,
"mode_match": mode_match,
"submitted_service": sub_svc,
"correct_service": correct_svc,
},
"feedback": f"{base_fb} | efficiency={efficiency:.2f} | total={total:.2f}",
}
# ── Task 3: Remediation Planning (Hard) ──────────────────────────────────────
#
# Scoring (total up to 1.0):
# Base (0.6 if submitted with any investigation):
# Requires at least 1 diagnostic/remediation action + a summary
# Efficiency bonus (up to 0.3):
# Fraction of correct_remediation_sequence steps matched
# Wrong action penalty (up to -0.15):
# βˆ’0.05 per wrong action taken (capped at 3)
# Summary quality bonus (up to 0.10):
# Based on keyword coverage in the resolution summary
#
# This is genuinely HARD: requires multi-phase execution:
# Phase 1: Diagnose (query logs to confirm root cause)
# Phase 2: Remediate (execute 3–5 specific actions in order)
# Phase 3: Document (write a coherent summary with key details)
# Wrong remediation actions actively harm the score. The sequence
# matters. The summary must reference specific services and actions.
def _grade_remediation_planning(state: dict, scenario: dict) -> dict:
history = state.get("action_history", [])
correct_seq = scenario.get("correct_remediation_sequence", [])
wrong_map = scenario.get("wrong_actions", {})
keywords = scenario.get("resolution_keywords", [])
diag_rem = {
"query_logs",
"check_metrics",
"check_dependencies",
"check_recent_deploys",
"check_service_status",
"restart_service",
"rollback_deploy",
"scale_service",
"disable_feature_flag",
"clear_cache",
"execute_runbook_step",
}
summary = ""
for a in history:
if a.get("action_type") == "submit_resolution":
summary = a.get("parameters", {}).get("summary", "")
break
inv_count = sum(1 for a in history if a.get("action_type") in diag_rem)
if not summary or inv_count < 1:
return {
"total": 0.0,
"breakdown": {
"base": 0.0,
"efficiency": 0.0,
"penalty": 0.0,
"summary_bonus": 0.0,
},
"feedback": "No resolution submitted or no investigation β€” score 0.0",
}
# Base scales with investigation depth β€” not a free 0.60
diag_count = sum(
1 for a in history if a.get("action_type") in {
"query_logs", "check_metrics", "check_dependencies",
"check_recent_deploys", "check_service_status",
}
)
rem_count = sum(
1 for a in history if a.get("action_type") in {
"restart_service", "rollback_deploy", "scale_service",
"disable_feature_flag", "clear_cache", "execute_runbook_step",
}
)
diag_credit = min(0.20, diag_count * 0.05)
rem_credit = min(0.20, rem_count * 0.05)
base = round(0.10 + diag_credit + rem_credit, 4)
executed = set()
for a in history:
at = a.get("action_type", "")
p = a.get("parameters", {})
svc = p.get("service", "")
flag = p.get("flag", "")
runbook = p.get("runbook_action", "")
target = p.get("target", "")
executed.add(at)
if svc:
executed.add(f"{at}:{svc}")
if flag:
executed.add(f"{at}:{flag}")
if runbook:
executed.add(f"execute_runbook_step:{runbook}")
if target:
executed.add(f"execute_runbook_step:{target}")
def _seq_key_matches(seq_key: str) -> bool:
if seq_key in executed:
return True
if ":" in seq_key:
action, target = seq_key.split(":", 1)
for ex_key in executed:
if ":" in ex_key:
ex_action, ex_target = ex_key.split(":", 1)
if ex_action == action and _svc_match(ex_target, target):
return True
return False
matched = sum(1 for k in correct_seq if _seq_key_matches(k))
efficiency = (
round((matched / len(correct_seq)) * 0.3, 4) if correct_seq else 0.0
)
wrong_count = sum(
1
for a in history
if (
a.get("action_type") in wrong_map
or f"{a.get('action_type')}:{a.get('parameters', {}).get('service', '')}"
in wrong_map
)
)
penalty = round(min(0.30, wrong_count * 0.10), 4)
sl = summary.lower()
hits = sum(1 for kw in keywords if kw in sl)
summary_bonus = 0.10 if hits >= 3 else (0.05 if hits >= 1 else 0.0)
total = round(max(0.0, min(1.0, base + efficiency - penalty + summary_bonus)), 4)
return {
"total": total,
"breakdown": {
"base": base,
"efficiency_bonus": efficiency,
"wrong_action_penalty": -penalty,
"summary_bonus": summary_bonus,
"correct_actions_matched": matched,
"correct_actions_total": len(correct_seq),
"wrong_actions_count": wrong_count,
"summary_keywords_hit": hits,
},
"feedback": (
f"base={base} | efficiency={efficiency:.2f} "
f"({matched}/{len(correct_seq)} correct) | "
f"penalty=-{penalty:.2f} | summary={summary_bonus:.2f} | "
f"total={total:.2f}"
),
}