""" Deterministic graders for each task. All graders return a score in [0.0, 1.0] and a feedback message. """ from typing import List, Tuple from environment.models import CodeReviewAction, Issue # ───────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────── def _keyword_hit(text: str, keywords: List[str]) -> bool: """Case-insensitive check — does `text` contain any keyword?""" text_lower = text.lower() return any(kw.lower() in text_lower for kw in keywords) def _keyword_score(text: str, keywords: List[str]) -> float: """Fraction of keywords found in text (0.0 – 1.0).""" if not keywords: return 0.0 hits = sum(1 for kw in keywords if kw.lower() in text.lower()) return hits / len(keywords) def _issue_text(issues: List[Issue]) -> str: """Concatenate all issue fields into a single string for matching.""" parts = [] for issue in issues: parts.append(issue.issue_type) parts.append(issue.description) if issue.line_number is not None: parts.append(str(issue.line_number)) return " ".join(parts).lower() # ───────────────────────────────────────────────────────────── # Easy grader: syntax error detection # ───────────────────────────────────────────────────────────── def grade_easy(action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]: """ Rubric (total 1.0): 0.35 — identified issue_type == "syntax_error" 0.35 — description mentions the relevant keywords (colon / if / syntax) 0.30 — suggested_fix contains the corrected line """ score = 0.0 feedback_parts = [] issue_types = [i.issue_type.lower() for i in action.identified_issues] all_text = _issue_text(action.identified_issues) + " " + (action.explanation or "") # 1) Issue type check if "syntax_error" in issue_types: score += 0.35 feedback_parts.append("✅ Correctly identified as a syntax error (+0.35)") else: feedback_parts.append( f"❌ Expected issue_type='syntax_error', got {issue_types} (+0.00)" ) # 2) Description keyword check kw_score = _keyword_score(all_text, ground_truth["keywords"]) desc_points = round(0.35 * min(kw_score * 2, 1.0), 3) score += desc_points feedback_parts.append( f"{'✅' if desc_points > 0.1 else '❌'} Description accuracy: " f"{desc_points:.2f}/0.35 (keyword match {kw_score:.0%})" ) # 3) Fix quality check fix = action.suggested_fix or "" if _keyword_hit(fix, ground_truth["fix_keywords"]): score += 0.30 feedback_parts.append("✅ Suggested fix contains the correct patch (+0.30)") else: feedback_parts.append("❌ Suggested fix missing or incorrect (+0.00)") score = round(min(score, 1.0), 4) return score, "\n".join(feedback_parts) # ───────────────────────────────────────────────────────────── # Medium grader: logic bug detection # ───────────────────────────────────────────────────────────── def _run_is_palindrome(code: str) -> List[Tuple[str, bool, bool]]: """ Execute the patched `is_palindrome` function in a subprocess-safe sandbox. Returns list of (input, expected, actual). """ import subprocess, sys, json, textwrap test_driver = textwrap.dedent(f""" {code} import json, sys cases = [ ("racecar", True), ("hello", False), ("amanaplanacanalpanama", True), ("abba", True), ("abc", False), ] results = [] for inp, exp in cases: try: got = is_palindrome(inp) results.append([inp, exp, bool(got)]) except Exception as e: results.append([inp, exp, None]) print(json.dumps(results)) """) try: out = subprocess.run( [sys.executable, "-c", test_driver], capture_output=True, text=True, timeout=5 ) if out.returncode != 0: return [] return [tuple(r) for r in json.loads(out.stdout.strip())] except Exception: return [] def grade_medium(action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]: """ Rubric (total 1.0): 0.25 — identified issue_type == "logic_bug" 0.25 — description mentions index / off-by-one keywords 0.50 — suggested fix passes all 5 test cases (0.10 each) """ score = 0.0 feedback_parts = [] issue_types = [i.issue_type.lower() for i in action.identified_issues] all_text = _issue_text(action.identified_issues) + " " + (action.explanation or "") # 1) Issue type if "logic_bug" in issue_types: score += 0.25 feedback_parts.append("✅ Correctly identified as a logic bug (+0.25)") else: feedback_parts.append(f"❌ Expected 'logic_bug', got {issue_types} (+0.00)") # 2) Description accuracy kw_score = _keyword_score(all_text, ground_truth["keywords"]) desc_pts = round(0.25 * min(kw_score * 2.5, 1.0), 3) score += desc_pts feedback_parts.append( f"{'✅' if desc_pts > 0.08 else '❌'} Description accuracy: " f"{desc_pts:.2f}/0.25 (keyword match {kw_score:.0%})" ) # 3) Fix execution test fix_code = action.suggested_fix or "" if fix_code.strip(): results = _run_is_palindrome(fix_code) if results: passed = sum(1 for inp, exp, got in results if got == exp) pts = round(0.50 * (passed / len(results)), 3) score += pts feedback_parts.append( f"{'✅' if passed == len(results) else '⚠️'} Fix passed " f"{passed}/{len(results)} test cases → +{pts:.2f}/0.50" ) for inp, exp, got in results: status = "✅" if got == exp else "❌" feedback_parts.append(f" {status} is_palindrome({inp!r}) → {got} (expected {exp})") else: feedback_parts.append("❌ Fix code could not be executed (+0.00)") else: feedback_parts.append("❌ No suggested fix provided (+0.00)") score = round(min(score, 1.0), 4) return score, "\n".join(feedback_parts) # ───────────────────────────────────────────────────────────── # Hard grader: security vulnerability detection # ───────────────────────────────────────────────────────────── def grade_hard(action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]: """ Rubric (total 1.0) — 3 vulnerabilities, each worth ~0.33: Per vulnerability: 0.15 — identified as security_vulnerability 0.10 — description mentions relevant keywords 0.08 — fix mentions remediation keywords Bonus 0.05 for finding all 3 and providing a complete fixed file. """ vulns = ground_truth["vulnerabilities"] per_vuln = 1.0 / len(vulns) all_issue_text = _issue_text(action.identified_issues) + " " + (action.explanation or "") fix_text = (action.suggested_fix or "") + " " + (action.explanation or "") total_score = 0.0 feedback_parts = [] found_count = 0 for vuln in vulns: v_score = 0.0 v_name = vuln["name"] feedback_parts.append(f"\n🔍 Checking: {v_name}") # a) issue_type == security_vulnerability sec_issues = [i for i in action.identified_issues if "security" in i.issue_type.lower()] if sec_issues: v_score += per_vuln * 0.45 feedback_parts.append(f" ✅ Flagged as security vulnerability (+{per_vuln*0.45:.3f})") else: feedback_parts.append(f" ❌ Not flagged as security vulnerability (+0.00)") # b) description keyword match kw_hit = _keyword_hit(all_issue_text, vuln["keywords"]) if kw_hit: kw_score_val = _keyword_score(all_issue_text, vuln["keywords"]) pts = round(per_vuln * 0.30 * min(kw_score_val * 3, 1.0), 4) v_score += pts feedback_parts.append(f" ✅ Identified '{v_name}' in description (+{pts:.3f})") found_count += 1 else: feedback_parts.append(f" ❌ '{v_name}' not mentioned in description (+0.00)") # c) fix keyword match fix_hit = _keyword_hit(fix_text, vuln["fix_keywords"]) if fix_hit: v_score += per_vuln * 0.25 feedback_parts.append(f" ✅ Fix addresses '{v_name}' (+{per_vuln*0.25:.3f})") else: feedback_parts.append(f" ❌ Fix doesn't address '{v_name}' (+0.00)") total_score += v_score # Bonus: found all 3 if found_count == len(vulns): total_score = min(total_score + 0.05, 1.0) feedback_parts.append("\n🎯 Bonus: All 3 vulnerabilities identified! (+0.05)") total_score = round(min(total_score, 1.0), 4) return total_score, "\n".join(feedback_parts) # ───────────────────────────────────────────────────────────── # Dispatcher # ───────────────────────────────────────────────────────────── def grade(task_id: str, action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]: if task_id == "easy_syntax": return grade_easy(action, ground_truth) elif task_id == "medium_logic": return grade_medium(action, ground_truth) elif task_id == "hard_security": return grade_hard(action, ground_truth) else: raise ValueError(f"No grader for task: {task_id}")