Spaces:
Sleeping
Sleeping
| """ | |
| graders.py β Deterministic graders for all 3 Cloud Incident Response tasks. | |
| Public API: | |
| grade(task_id, state, scenario) -> {"total": float, "breakdown": dict, "feedback": str} | |
| All scores are in [0.0, 1.0]. | |
| Graders are fully deterministic and reproducible β same inputs always produce same score. | |
| """ | |
| from __future__ import annotations | |
| def grade(task_id: str, state: dict, scenario: dict) -> dict: | |
| """Route to the correct task grader.""" | |
| _graders = { | |
| "alert_classification": _grade_alert_classification, | |
| "root_cause_analysis": _grade_root_cause_analysis, | |
| "remediation_planning": _grade_remediation_planning, | |
| } | |
| fn = _graders.get(task_id) | |
| if fn is None: | |
| return { | |
| "total": 0.0, | |
| "breakdown": {}, | |
| "feedback": f"Unknown task_id '{task_id}'", | |
| } | |
| return fn(state, scenario) | |
| # ββ Task 1: Alert Classification βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _grade_alert_classification(state: dict, scenario: dict) -> dict: | |
| """ | |
| 1.0 β exact severity match | |
| 0.5 β adjacent severity (one level off) | |
| 0.25 β two levels off | |
| 0.0 β three levels off or no submission | |
| """ | |
| history = state.get("action_history", []) | |
| correct = scenario.get("correct_severity", "P1") | |
| adjacent = scenario.get("adjacent_severities", []) | |
| order = ["P1", "P2", "P3", "P4"] | |
| submitted = None | |
| for a in history: | |
| if a.get("action_type") == "submit_severity": | |
| submitted = a.get("parameters", {}).get("severity", "").upper().strip() | |
| break | |
| if not submitted: | |
| return { | |
| "total": 0.0, | |
| "breakdown": {"submitted": False, "severity_match": 0.0}, | |
| "feedback": "No severity submitted β score 0.0", | |
| } | |
| if submitted == correct: | |
| score, msg = 1.0, f"Exact match: {submitted}" | |
| elif submitted in adjacent: | |
| score, msg = 0.5, f"Adjacent: submitted {submitted}, correct {correct}" | |
| else: | |
| try: | |
| dist = abs(order.index(submitted) - order.index(correct)) | |
| except ValueError: | |
| dist = 4 | |
| score = 0.25 if dist == 2 else 0.0 | |
| msg = f"Wrong: submitted {submitted}, correct {correct} (distance={dist})" | |
| return { | |
| "total": score, | |
| "breakdown": { | |
| "submitted_severity": submitted, | |
| "correct_severity": correct, | |
| "severity_match": score, | |
| }, | |
| "feedback": msg, | |
| } | |
| # ββ Task 2: Root Cause Analysis ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _grade_root_cause_analysis(state: dict, scenario: dict) -> dict: | |
| """ | |
| Base (0.0β0.6): | |
| 0.60 β correct service + failure mode keyword match | |
| 0.35 β correct service only | |
| 0.00 β wrong service | |
| Efficiency bonus (0.0β0.4): | |
| Rewards targeted investigation (relevant queries / total queries). | |
| Penalises spray-and-pray approach. | |
| """ | |
| history = state.get("action_history", []) | |
| correct_rc = scenario.get("correct_root_cause", {}) | |
| correct_svc = correct_rc.get("service", "").lower().strip() | |
| correct_mode = correct_rc.get("failure_mode", "").lower().strip() | |
| known = {s.lower() for s in scenario.get("known_services", set())} | |
| diag_types = { | |
| "query_logs", "check_metrics", "check_dependencies", | |
| "check_recent_deploys", "check_service_status", | |
| } | |
| # Find submit_root_cause | |
| sub_svc, sub_mode, sub_step = "", "", len(history) | |
| for a in history: | |
| if a.get("action_type") == "submit_root_cause": | |
| p = a.get("parameters", {}) | |
| sub_svc = p.get("service", "").lower().strip() | |
| sub_mode = p.get("failure_mode", "").lower().strip() | |
| sub_step = a.get("step", len(history)) | |
| break | |
| if not sub_svc: | |
| return { | |
| "total": 0.0, | |
| "breakdown": {"base": 0.0, "efficiency": 0.0, "submitted": False}, | |
| "feedback": "No root cause submitted β score 0.0", | |
| } | |
| svc_match = sub_svc == correct_svc | |
| mode_kws = [w for w in correct_mode.split() if len(w) > 3] | |
| mode_match = svc_match and ( | |
| any(kw in sub_mode for kw in mode_kws) if mode_kws else True | |
| ) | |
| if mode_match: | |
| base, base_fb = 0.6, f"Correct service + failure mode" | |
| elif svc_match: | |
| base, base_fb = 0.35, f"Correct service only β failure mode unclear" | |
| else: | |
| base, base_fb = 0.0, f"Wrong service: '{sub_svc}' (correct: '{correct_svc}')" | |
| # Efficiency bonus | |
| efficiency = 0.0 | |
| if svc_match: | |
| pre_submit = [ | |
| a for a in history[:sub_step] | |
| if a.get("action_type") in diag_types | |
| ] | |
| queried_svcs = { | |
| a.get("parameters", {}).get("service", "").lower() | |
| for a in pre_submit | |
| } | |
| relevant = queried_svcs & known | |
| total_q = len(pre_submit) | |
| if total_q > 0: | |
| precision = len(relevant) / max(total_q, 1) | |
| # Bonus: 0.0β0.4, rewarding targeted queries | |
| efficiency = round( | |
| min(0.4, precision * 0.4 + min(len(relevant), 3) * 0.05), 4 | |
| ) | |
| total = round(min(1.0, base + efficiency), 4) | |
| return { | |
| "total": total, | |
| "breakdown": { | |
| "base": base, | |
| "efficiency_bonus": efficiency, | |
| "service_match": svc_match, | |
| "mode_match": mode_match, | |
| "submitted_service": sub_svc, | |
| "correct_service": correct_svc, | |
| }, | |
| "feedback": ( | |
| f"{base_fb} | efficiency={efficiency:.2f} | total={total:.2f}" | |
| ), | |
| } | |
| # ββ Task 3: Remediation Planning βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _grade_remediation_planning(state: dict, scenario: dict) -> dict: | |
| """ | |
| Base (0.0 or 0.6): | |
| 0.6 β submit_resolution with summary after β₯1 investigation action | |
| Efficiency bonus (0.0β0.3): | |
| Fraction of correct remediation sequence steps executed | |
| Wrong action penalty (0.0β0.15): | |
| -0.05 per wrong action, capped at -0.15 | |
| Summary quality bonus (0.0β0.1): | |
| +0.10 if summary contains β₯3 resolution keywords | |
| +0.05 if summary contains β₯1 resolution keyword | |
| """ | |
| history = state.get("action_history", []) | |
| correct_seq = scenario.get("correct_remediation_sequence", []) | |
| wrong_map = scenario.get("wrong_actions", {}) | |
| keywords = scenario.get("resolution_keywords", []) | |
| diag_rem = { | |
| "query_logs", "check_metrics", "check_dependencies", | |
| "check_recent_deploys", "check_service_status", | |
| "restart_service", "rollback_deploy", "scale_service", | |
| "disable_feature_flag", "clear_cache", "execute_runbook_step", | |
| } | |
| summary = "" | |
| for a in history: | |
| if a.get("action_type") == "submit_resolution": | |
| summary = a.get("parameters", {}).get("summary", "") | |
| break | |
| inv_count = sum(1 for a in history if a.get("action_type") in diag_rem) | |
| if not summary or inv_count < 1: | |
| return { | |
| "total": 0.0, | |
| "breakdown": { | |
| "base": 0.0, "efficiency": 0.0, | |
| "penalty": 0.0, "summary_bonus": 0.0, | |
| }, | |
| "feedback": "No resolution submitted or no investigation β score 0.0", | |
| } | |
| base = 0.6 | |
| # Build executed action key set | |
| executed = set() | |
| for a in history: | |
| at = a.get("action_type", "") | |
| p = a.get("parameters", {}) | |
| svc = p.get("service", "") | |
| flag = p.get("flag", "") | |
| runbook = p.get("runbook_action", "") | |
| target = p.get("target", "") | |
| executed.add(at) | |
| if svc: executed.add(f"{at}:{svc}") | |
| if flag: executed.add(f"{at}:{flag}") | |
| if runbook: executed.add(f"execute_runbook_step:{runbook}") | |
| if target: executed.add(f"execute_runbook_step:{target}") | |
| matched = sum(1 for k in correct_seq if k in executed) | |
| efficiency = round((matched / len(correct_seq)) * 0.3, 4) if correct_seq else 0.0 | |
| # Wrong action penalty | |
| wrong_count = sum( | |
| 1 for a in history | |
| if (a.get("action_type") in wrong_map or | |
| f"{a.get('action_type')}:{a.get('parameters',{}).get('service','')}" | |
| in wrong_map) | |
| ) | |
| penalty = round(min(0.15, wrong_count * 0.05), 4) | |
| # Summary quality | |
| sl = summary.lower() | |
| hits = sum(1 for kw in keywords if kw in sl) | |
| summary_bonus = 0.10 if hits >= 3 else (0.05 if hits >= 1 else 0.0) | |
| total = round(max(0.0, min(1.0, base + efficiency - penalty + summary_bonus)), 4) | |
| return { | |
| "total": total, | |
| "breakdown": { | |
| "base": base, | |
| "efficiency_bonus": efficiency, | |
| "wrong_action_penalty": -penalty, | |
| "summary_bonus": summary_bonus, | |
| "correct_actions_matched": matched, | |
| "correct_actions_total": len(correct_seq), | |
| "wrong_actions_count": wrong_count, | |
| "summary_keywords_hit": hits, | |
| }, | |
| "feedback": ( | |
| f"base={base} | efficiency={efficiency:.2f} " | |
| f"({matched}/{len(correct_seq)} correct) | " | |
| f"penalty=-{penalty:.2f} | summary={summary_bonus:.2f} | " | |
| f"total={total:.2f}" | |
| ), | |
| } | |