openenv-python-env / server /static_review.py
uvpatel7271's picture
Upload folder using huggingface_hub
566a172 verified
"""Deterministic static-review helpers for arbitrary Python code.
Unlike the benchmark grader, this module does not compare against hidden rubric
items. Instead, it performs direct AST-based review on arbitrary snippets so it
can be used for manual testing, examples, and future dataset generation.
"""
from __future__ import annotations
import ast
from typing import List, Optional
try:
from models import DirectReviewResponse, ReviewFinding
except ModuleNotFoundError: # pragma: no cover
from ..models import DirectReviewResponse, ReviewFinding
class _StaticAnalyzer(ast.NodeVisitor):
"""AST visitor that emits structured review findings.
The visitor intentionally focuses on a small set of high-signal patterns so
the direct-review endpoint stays predictable and easy to understand.
"""
def __init__(self) -> None:
self.issues: List[ReviewFinding] = []
def visit_FunctionDef(self, node: ast.FunctionDef) -> None: # noqa: N802
"""Flag mutable default arguments in function definitions."""
for default in list(node.args.defaults):
if isinstance(default, (ast.List, ast.Dict, ast.Set)):
self.issues.append(
ReviewFinding(
title="Mutable default argument",
line=getattr(default, "lineno", node.lineno),
category="bug",
severity="warning",
rationale=(
"Mutable defaults persist across calls and can leak state "
"between unrelated requests."
),
recommendation="Use None as the default and create the object inside the function.",
rule_id="mutable-default-list",
)
)
self.generic_visit(node)
def visit_Call(self, node: ast.Call) -> None: # noqa: N802
"""Inspect function calls for obviously unsafe or noisy patterns."""
func_name = self._call_name(node)
if func_name in {"eval", "exec"}:
self.issues.append(
ReviewFinding(
title=f"Avoid {func_name} on untrusted input",
line=node.lineno,
category="security",
severity="critical",
rationale=(
f"{func_name} executes arbitrary code and is unsafe on "
"user-controlled input."
),
recommendation="Use a safe parser or a whitelist-based evaluator.",
rule_id="avoid-eval" if func_name == "eval" else "avoid-exec",
)
)
if func_name.endswith("check_output") or func_name.endswith("run"):
for keyword in node.keywords:
# `shell=True` is only a problem when the command comes from a
# shell-parsed string, but this heuristic is high value for
# review and intentionally conservative.
if keyword.arg == "shell" and isinstance(keyword.value, ast.Constant) and keyword.value.value is True:
self.issues.append(
ReviewFinding(
title="shell=True with dynamic input",
line=node.lineno,
category="security",
severity="critical",
rationale=(
"shell=True executes through the shell and can allow "
"command injection when the command string is interpolated."
),
recommendation="Pass a list of arguments and keep shell=False.",
rule_id="shell-true-command-injection",
)
)
if func_name == "print":
self.issues.append(
ReviewFinding(
title="Print statement in application logic",
line=node.lineno,
category="style",
severity="info",
rationale="Production services should prefer structured logging over print statements.",
recommendation="Use the logging module or return the value to the caller.",
rule_id="print-statement",
)
)
self.generic_visit(node)
def visit_ExceptHandler(self, node: ast.ExceptHandler) -> None: # noqa: N802
"""Flag bare exception handlers that hide failures."""
if node.type is None:
self.issues.append(
ReviewFinding(
title="Bare except",
line=node.lineno,
category="maintainability",
severity="warning",
rationale="Bare except catches KeyboardInterrupt and other system-level exceptions.",
recommendation="Catch a specific exception and record the failure.",
rule_id="bare-except",
)
)
self.generic_visit(node)
def visit_For(self, node: ast.For) -> None: # noqa: N802
"""Look for list-membership checks nested in loops."""
for child in ast.walk(node):
if isinstance(child, ast.Compare) and any(
isinstance(operator, (ast.In, ast.NotIn)) for operator in child.ops
):
if isinstance(child.comparators[0], ast.Name):
self.issues.append(
ReviewFinding(
title="Potential quadratic membership check inside loop",
line=child.lineno,
category="performance",
severity="warning",
rationale=(
"Repeated membership checks against a list inside a loop "
"can degrade to quadratic runtime."
),
recommendation="Use a set or dict for O(1) membership checks.",
rule_id="quadratic-membership-check",
)
)
break
self.generic_visit(node)
@staticmethod
def _call_name(node: ast.Call) -> str:
"""Extract a dotted function name such as `subprocess.run`."""
func = node.func
if isinstance(func, ast.Name):
return func.id
if isinstance(func, ast.Attribute):
prefix = _StaticAnalyzer._attribute_prefix(func.value)
return f"{prefix}.{func.attr}" if prefix else func.attr
return ""
@staticmethod
def _attribute_prefix(node: ast.AST) -> str:
"""Reconstruct the left-hand side of an attribute chain."""
if isinstance(node, ast.Name):
return node.id
if isinstance(node, ast.Attribute):
prefix = _StaticAnalyzer._attribute_prefix(node.value)
return f"{prefix}.{node.attr}" if prefix else node.attr
return ""
def analyze_python_code(code: str) -> List[ReviewFinding]:
"""Analyze arbitrary Python code and return structured findings."""
if not code.strip():
return [
ReviewFinding(
title="No code provided",
category="bug",
severity="warning",
rationale="The reviewer cannot inspect an empty submission.",
recommendation="Provide Python source code.",
rule_id="empty-input",
)
]
# Syntax errors are turned into findings rather than exceptions so API
# consumers always get a valid response shape.
try:
tree = ast.parse(code)
except SyntaxError as exc:
return [
ReviewFinding(
title="Syntax error",
line=exc.lineno,
category="bug",
severity="critical",
rationale=exc.msg,
recommendation="Fix the syntax error before running static review.",
rule_id="syntax-error",
)
]
analyzer = _StaticAnalyzer()
analyzer.visit(tree)
return _deduplicate(analyzer.issues)
def build_direct_review_response(
code: str, context: Optional[str] = None
) -> DirectReviewResponse:
"""Build the public direct-review response for the `/review` route."""
issues = analyze_python_code(code)
weighted_penalty = 0.0
# The direct-review score is intentionally simple: more severe issues lower
# the score more aggressively.
for issue in issues:
if issue.severity == "critical":
weighted_penalty += 0.3
elif issue.severity == "warning":
weighted_penalty += 0.15
else:
weighted_penalty += 0.05
score = max(0.0, min(1.0, 1.0 - weighted_penalty))
summary = _build_summary(issues, context)
improved_code = _suggest_improved_code(code, issues)
return DirectReviewResponse(
issues=issues,
summary=summary,
score=score,
improved_code=improved_code,
)
def _build_summary(issues: List[ReviewFinding], context: Optional[str]) -> str:
"""Create a concise human-readable summary for the direct-review response."""
if not issues:
base = "No obvious issues were detected by the deterministic reviewer."
else:
critical = sum(1 for issue in issues if issue.severity == "critical")
warnings = sum(1 for issue in issues if issue.severity == "warning")
infos = sum(1 for issue in issues if issue.severity == "info")
base = (
f"Detected {len(issues)} issue(s): {critical} critical, "
f"{warnings} warning, {infos} info."
)
if context:
return f"{base} Context: {context}"
return base
def _suggest_improved_code(code: str, issues: List[ReviewFinding]) -> Optional[str]:
"""Append high-level fix directions to the submitted code."""
if not issues:
return None
suggestions = [issue.recommendation for issue in issues if issue.recommendation]
comment = " | ".join(dict.fromkeys(suggestions))
return f"{code.rstrip()}\n\n# Suggested review directions: {comment}"
def _deduplicate(findings: List[ReviewFinding]) -> List[ReviewFinding]:
"""Drop duplicate findings that refer to the same rule and line."""
seen = set()
unique: List[ReviewFinding] = []
for finding in findings:
key = (finding.rule_id, finding.line, finding.category)
if key in seen:
continue
seen.add(key)
unique.append(finding)
return unique