diff --git a/DEMO_SCRIPT.md b/DEMO_SCRIPT.md index 370cb3604936eabc892f8a44b50665f67c39019b..252599c1fdc7fd76ffb2ed52801148a3a6e602e0 100644 --- a/DEMO_SCRIPT.md +++ b/DEMO_SCRIPT.md @@ -2,20 +2,11 @@ ## 60-90 Second Walkthrough -1. Introduce TorchReview Copilot as an AI-powered code review system that helps developers find bugs, reduce complexity, and improve maintainability faster. -2. Frame the problem clearly: manual code reviews are slow, inconsistent, and hard to scale across growing teams and codebases. -3. Open the Streamlit app and load the `Boundary Bug` example to show a realistic Python regression with failing behavior. -4. Point out the pipeline on-screen: - input code, static analysis, PyTorch scoring, suggestions, and RL-ready reward output. -5. Highlight the PyTorch story: - the app uses CodeBERTa embeddings through PyTorch to score code quality, maintainability, and domain fit. -6. Show the headline metrics: - detected domain, ML score, lint score, and final reward. -7. Scroll to the reward breakdown and explain that the reward is not arbitrary; it combines ML quality, maintainability, security, lint signals, and complexity penalties. -8. Open the Suggestions tab and show the prioritized fixes plus the three-step improvement plan. -9. Switch to the `Performance Hotspot` example to demonstrate that the system adapts to a different issue profile and pushes optimization hints instead of only syntax guidance. -10. Close by emphasizing that the same repo also works as an OpenEnv environment, so the project is both a usable developer product and an RL-ready benchmark component. - -## 20-Second Closing Line - -TorchReview Copilot turns code review into a measurable AI workflow: PyTorch handles semantic scoring, deterministic analyzers keep it grounded, and OpenEnv makes it trainable and benchmarkable. +1. Open the Hugging Face Space and introduce TorchReview Copilot as an AI-powered code review and improvement system built with PyTorch. +2. Point to the problem statement: manual code review is slow, inconsistent, and hard to scale. +3. Select the `Fix the invoice total syntax regression` example to show the app loading a broken code sample together with the context window. +4. Highlight the **Live Triage Radar**, the ML quality score, and the RL-ready reward score. +5. Explain that the PyTorch layer uses CodeBERTa embeddings to compare the input against known code-quality patterns from the OpenEnv task catalog. +6. Scroll to the three-step improvement plan and call out the progression: syntax and bug fixes, edge cases, then scalability. +7. Switch to the performance example to show the confidence profile and reward changing for a different class of issue. +8. Close by noting that OpenEnv still powers deterministic validation under the hood, so the demo remains grounded in measurable task outcomes. diff --git a/Dockerfile b/Dockerfile index 491f212cce74c59de8ff59b4a839f16d853caf29..3aac14ba49a72604cd03c088b4401f0f0508d4ed 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,24 +6,31 @@ ENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONIOENCODING=utf-8 \ PIP_NO_CACHE_DIR=1 \ PIP_DISABLE_PIP_VERSION_CHECK=1 \ - PIP_DEFAULT_TIMEOUT=120 \ + PIP_ROOT_USER_ACTION=ignore \ ENABLE_GRADIO_DEMO=false \ ENABLE_WEB_INTERFACE=false WORKDIR /app -COPY server/requirements.txt /tmp/requirements.txt +COPY server/requirements.runtime.txt /tmp/requirements.runtime.txt -RUN python -m pip install --upgrade pip && \ - pip install --prefer-binary -r /tmp/requirements.txt +RUN apt-get update && \ + apt-get upgrade -y && \ + rm -rf /var/lib/apt/lists/* -COPY . /app +RUN useradd --create-home --shell /usr/sbin/nologin appuser && \ + python -m pip install --upgrade pip setuptools && \ + pip install -r /tmp/requirements.runtime.txt + +COPY --chown=appuser:appuser . /app RUN pip install --no-deps . +USER appuser + EXPOSE 8000 HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ CMD python -c "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/health', timeout=3).read()" -CMD ["uvicorn", "server.app:app", "--host", "0.0.0.0", "--port", "8000"] +CMD ["uvicorn", "server.app:app", "--host", "0.0.0.0", "--port", "8000", "--no-access-log"] diff --git a/README.md b/README.md index 046b1f4a7e814ae3de97642d97f36a6c6e99411c..09da0ae85edb888ac82f3547e92a3eb3369a8a16 100644 --- a/README.md +++ b/README.md @@ -1,232 +1,91 @@ --- -title: TorchReview Copilot +title: Python Code Review Environment Server sdk: docker app_port: 8000 base_path: /web pinned: false tags: - openenv - - pytorch - - code-review --- -# TorchReview Copilot +# OpenEnv Python Code Review Environment -TorchReview Copilot is an AI-powered code review and improvement system built for the Meta PyTorch OpenEnv Hackathon. It combines deterministic static analysis, a real PyTorch code encoder, domain-aware review logic, and RL-ready reward shaping to help developers catch bugs, reduce complexity, and improve maintainability faster. +Production-ready hackathon submission for OpenEnv evaluation, deterministic validator runs, and Hugging Face Docker deployment. -## Problem Statement - -Manual code review is slow, inconsistent, and difficult to scale. Small logic bugs slip through, performance hotspots hide in otherwise correct code, and review quality changes from reviewer to reviewer. - -## Solution - -TorchReview Copilot accepts Python code, analyzes it with AST and complexity heuristics, scores it with a PyTorch model, and returns: - -- A code quality score -- Domain-aware review feedback -- Actionable improvement suggestions -- An RL-ready reward signal for OpenEnv environments - -## Why This Is Hackathon-Worthy - -- Solves a real developer productivity problem -- Uses PyTorch meaningfully for model inference, not as a placeholder -- Produces a measurable reward signal for RL workflows -- Ships as a usable product with API, UI, docs, tests, and OpenEnv compatibility - -## Tech Stack - -- `PyTorch` for model execution and similarity scoring -- `transformers` with `huggingface/CodeBERTa-small-v1` for pretrained code embeddings -- `FastAPI` for the analysis API -- `Streamlit` for the interactive review UI -- `Pydantic` for request and response validation -- `OpenAI` Python client for hackathon-compliant LLM action planning in `inference.py` -- `OpenEnv` for environment, reward, and validator integration - -## Pipeline - -```text -Input Python Code - -> AST Parsing + Structural Signals - -> Complexity + Lint Heuristics - -> PyTorch Model Inference (CodeBERTa / torch fallback) - -> Domain Analysis + Suggestion Engine - -> RL Reward Shaping - -> UI + API + OpenEnv Environment Output -``` - -## PyTorch Integration - -PyTorch is used in the core scoring path: - -- The app loads `huggingface/CodeBERTa-small-v1` through `transformers` -- Input code, repository context, traceback text, and static-analysis hints are embedded with the encoder -- The resulting embedding is compared against quality, maintainability, domain, and issue prototypes -- The model produces: - - `ml_quality_score` - - `maintainability_score` - - domain confidences - - issue probabilities - -If pretrained weights are unavailable, the project falls back to a torch-native hashed embedding backend so local demos and CI still work offline. - -## Reward System - -The system is RL-ready by design. Reward shaping blends model confidence, code quality, security, maintainability, and complexity into a bounded signal. - -Core reward: - -```text -reward = 0.50*ml_score - + 0.18*lint_score - + 0.12*maintainability_score - + 0.10*domain_score - + 0.10*security_score - - 0.20*complexity_penalty -``` - -The OpenEnv environment adds step-level shaping for: - -- public test progress -- syntax recovery -- runtime improvements -- error reduction -- final submission success -- regressions and invalid actions - -All task and step rewards are normalized into a strict safe interval for OpenEnv validation and printed in a validator-safe two-decimal band. - -## Features - -- Real PyTorch-backed code quality inference -- Static analysis with syntax, lint, AST, and complexity signals -- Domain-aware review for DSA, data science, ML/DL, and web code -- Prioritized suggestions and a compact 3-step improvement plan -- Auto-fix preview hints for quick wins -- Real-time Streamlit scoring mode -- OpenEnv-compatible environment and `inference.py` -- Deterministic benchmark tasks for syntax fixes, bug fixes, and optimization - -## WOW Features - -- Real-time scoring in the Streamlit interface -- Auto-fix preview panel -- Reward visualization and score breakdown -- OpenEnv environment with transparent reward decomposition - -## Project Structure +## Architecture ```text root -|- inference.py -|- api/ +|- inference.py # Root validator entrypoint +|- openenv.yaml # OpenEnv manifest |- app/ -| |- agents/ -| |- env/ -| |- models/ -| |- services/ -| `- utils/ -|- analyzers/ -|- graders/ -|- models/ -|- schemas/ -|- services/ -|- tasks/ -|- tests/ -`- utils/ -``` - -Key modules: - -- `models/pytorch_model.py`: PyTorch + transformer inference -- `services/analysis_service.py`: end-to-end review pipeline -- `services/reward_service.py`: RL-friendly reward shaping -- `services/suggestion_service.py`: actionable recommendations -- `app/streamlit_app.py`: interactive UI -- `server/env.py`: OpenEnv environment implementation -- `app/env/runner.py`: strict `inference.py` runner - -## API - -Run the analysis API: - -```bash -python -m uvicorn api.main:app --host 0.0.0.0 --port 7860 +| |- agents/ # Action policy and fallback strategy +| |- env/ # RL loop runner and stdout contract +| |- models/ # Inference dataclasses/config +| |- services/ # OpenAI client wrapper with retries +| `- utils/ # Formatting, task loading, log suppression +|- server/ +| |- env.py # OpenEnv environment and reward shaping +| |- app.py # FastAPI/OpenEnv app, optional Gradio mount +| `- Dockerfile # Alternate Docker build path +|- Dockerfile # Root deployment Docker image +|- graders/ # Syntax, bug-fix, optimization graders +|- tasks/ # Deterministic benchmark tasks and references +|- services/ # Multi-domain analysis services +|- analyzers/ # Domain-specific analyzers +|- models/ # Lazy-loaded PyTorch scoring model +|- schemas/ # API request/response contracts +`- tests/ # Local validation coverage ``` -Main endpoint: - -- `POST /analyze` - -The API returns: +Runtime flow: -- detected domain -- static-analysis summary -- model prediction -- score breakdown -- suggestions -- improvement plan - -## Streamlit UI - -Run the product UI locally: - -```bash -streamlit run app/streamlit_app.py +```text +inference.py + -> app.env.runner.InferenceRunner + -> env.reset(task_id=...) + -> ReviewAgent(action planning) + -> env.step_result(action) + -> strict [START]/[STEP]/[END] output ``` -The UI includes: - -- code input editor -- example snippets -- real-time scoring toggle -- ML score, lint score, and reward display -- domain confidence chart -- reward-signal visualization -- suggestion list and auto-fix preview - -## OpenEnv Compatibility - -This repository is also a valid OpenEnv submission: +## What Was Fixed -- `inference.py` is in the repo root -- `API_BASE_URL` and `MODEL_NAME` have defaults -- `HF_TOKEN` is read from the environment -- The runner uses the official `OpenAI` Python client -- Output follows the required `[START]`, `[STEP]`, `[END]` contract - -Example: - -```text -[START] task=syntax_fix_invoice_totals env=python_code_review_env model=Qwen/Qwen2.5-3B-Instruct -[STEP] step=1 action=run_tests reward=0.34 done=false error=null -[STEP] step=2 action=edit_code reward=0.42 done=false error=null -[STEP] step=3 action=submit_solution reward=0.99 done=true error=null -[END] success=true steps=3 rewards=0.34,0.42,0.99 -``` +- `inference.py` now lives at the repo root and delegates to a strict runner under `app/env`. +- OpenAI usage is limited to the official Python client: + `client = OpenAI(base_url=API_BASE_URL, api_key=provider_token)`. +- Defaulted env vars are enforced for `API_BASE_URL` and `MODEL_NAME`; the runtime now selects `HF_TOKEN` for the Hugging Face router and `OPENAI_API_KEY` for direct OpenAI usage. +- Output now matches the required single-line contract exactly and always emits `[END]`, including failure paths. +- The RL loop now uses `reset()` plus `step_result()` in a proper `while not done` loop. +- Step errors now surface through `last_action_error` and are printed in `[STEP]`. +- Reward shaping is now dynamic in the OpenEnv environment: + code quality, test progress, runtime progress, error removal, regressions, and completion are all part of the reward. +- The API-side reward service is no longer a static weighted sum and now exposes quality, error-reduction, and completion signals. +- The Docker image now builds from the repo root, caches dependency installation more effectively, and runs `server.app:app` directly on port `8000`. +- Server startup is lighter: + the PyTorch analyzer is lazy-loaded and the Gradio demo is disabled by default. -## Setup +## Local Setup -Install dependencies: +Install dev dependencies: ```bash pip install -e .[dev] ``` -Run tests: +Run the test suite: ```bash pytest -q ``` -Run the OpenEnv server: +Run the OpenEnv server locally: ```bash python -m uvicorn server.app:app --host 0.0.0.0 --port 8000 ``` -Run the demo UI mounted into the server: +Optional demo UI: ```bash set ENABLE_GRADIO_DEMO=true @@ -234,49 +93,100 @@ set ENABLE_WEB_INTERFACE=true python -m uvicorn server.app:app --host 0.0.0.0 --port 8000 ``` -## Hugging Face Spaces +## Inference Contract -This repo is designed to run on a Docker-based Hugging Face Space under a `2 vCPU / 8 GB RAM` budget. +Required environment variables: -Recommended Space settings: +- `API_BASE_URL` + Default: `https://router.huggingface.co/v1` +- `MODEL_NAME` + Default: `Qwen/Qwen2.5-3B-Instruct` +- `HF_TOKEN` + Required for `https://router.huggingface.co/v1` +- `OPENAI_API_KEY` + Required for `https://api.openai.com/v1` -- SDK: `Docker` -- Port: `8000` -- Secret: `HF_TOKEN` -- Optional vars: - - `API_BASE_URL` - - `MODEL_NAME` - - `ENABLE_GRADIO_DEMO=false` - - `ENABLE_WEB_INTERFACE=false` +Example: -## Screenshots +```bash +set API_BASE_URL=https://router.huggingface.co/v1 +set MODEL_NAME=Qwen/Qwen2.5-3B-Instruct +set HF_TOKEN=hf_xxx +python inference.py +``` -Add these before final submission: +```bash +set API_BASE_URL=https://api.openai.com/v1 +set MODEL_NAME=gpt-4.1-mini +set OPENAI_API_KEY=sk-xxx +python inference.py +``` -- Main review UI with code editor and reward metrics -- Suggestions tab with improvement plan -- OpenEnv task loop or validator output snippet +Expected stdout shape: -## Demo Link +```text +[START] task=syntax_fix_invoice_totals env=python_code_review_env model=Qwen/Qwen2.5-3B-Instruct +[STEP] step=1 action=run_tests reward=0.12 done=false error=null +[STEP] step=2 action=edit_code reward=0.96 done=false error=null +[STEP] step=3 action=run_tests reward=0.99 done=false error=null +[STEP] step=4 action=submit_solution reward=0.99 done=true error=null +[END] success=true steps=4 rewards=0.12,0.96,0.99,0.99 +``` -Add your live Hugging Face Space URL here before final submission. +## Docker -## Demo Script +Build from the project root: + +```bash +docker build -t openenv-python-code-review-env . +``` -See [DEMO_SCRIPT.md](DEMO_SCRIPT.md) for a concise hackathon walkthrough. +Run locally: -## Testing +```bash +docker run --rm -p 8000:8000 ^ + -e API_BASE_URL=https://router.huggingface.co/v1 ^ + -e MODEL_NAME=Qwen/Qwen2.5-3B-Instruct ^ + -e HF_TOKEN=hf_xxx ^ + openenv-python-code-review-env +``` -The repo includes coverage for: +Container behavior: -- score normalization into the strict OpenEnv-safe interval -- inference output formatting -- API response structure -- multi-domain analysis behavior -- triage and embedding behavior +- Base image: `python:3.11-slim-bookworm` +- Build context: project root +- Runtime image installs the minimal API dependency set by default; Streamlit, PyTorch, and transformers stay out of the container, while Gradio is only used if the demo env flags are enabled. +- Healthcheck: `GET /health` +- Default entrypoint: `uvicorn server.app:app --host 0.0.0.0 --port 8000` -## Notes for Judges +## Hugging Face Spaces -- This is not a toy wrapper around an LLM. The review pipeline includes deterministic analysis, PyTorch-based code scoring, and explicit reward shaping. -- The system is useful both as a developer-facing application and as a benchmark-friendly RL environment. -- The design intentionally balances product polish with validator reliability. +Recommended deployment steps: + +1. Create a Docker Space. +2. Push this repository as-is. +3. Let Spaces build from the root `Dockerfile`. +4. Set Space secrets: + `HF_TOKEN` +5. Set Space variables as needed: + `API_BASE_URL`, `MODEL_NAME`, `ENABLE_GRADIO_DEMO=false` + `ENABLE_WEB_INTERFACE=false` is also supported for OpenEnv-managed deploys. +6. Confirm the app listens on port `8000`. +7. Smoke-test: + `/health` + `/reset` + `/step` + +## Performance Notes + +- Max concurrent environments default to `2`, aligned with a `2 vCPU / 8 GB RAM` target. +- The analyzer model is lazy-loaded instead of being created at startup. +- The inference runner relies on short prompts, low token budgets, and limited retries. +- The policy uses deterministic reference-code fallback instead of expensive iterative code generation. +- Public validation is preferred before final submission to avoid wasted hidden-eval steps. + +## Known Limitations + +- If `HF_TOKEN` is absent, inference still completes with deterministic fallback actions, but LLM guidance is skipped. +- The benchmark tasks are deterministic and intentionally small; this is good for validator stability but not a full training benchmark. +- Gradio remains optional and is disabled by default to keep deployment lighter. diff --git a/__init__.py b/__init__.py index 6df09bc4fe02055cd825d79a0bfe1b716ce0858a..4f13e29c33475d3b4abda267521c879b75c45873 100644 --- a/__init__.py +++ b/__init__.py @@ -1,52 +1,36 @@ """Public package exports for python_code_review_env.""" -try: - from .client import PythonCodeReviewEnv, PythonEnv - from .models import ( - PyTorchCodeAnalyzerModel, - PythonAction, - PythonCodeReviewAction, - PythonCodeReviewObservation, - PythonCodeReviewState, - PythonObservation, - PythonState, - ) - from .schemas import AnalyzeCodeRequest, AnalyzeCodeResponse - from .services import AnalysisService - from .triage import CodeTriageEngine, HashingEmbeddingBackend, TransformersEmbeddingBackend, get_default_engine - from .triage_models import TriageResult -except ImportError: # pragma: no cover - from client import PythonCodeReviewEnv, PythonEnv - from models import ( - PyTorchCodeAnalyzerModel, - PythonAction, - PythonCodeReviewAction, - PythonCodeReviewObservation, - PythonCodeReviewState, - PythonObservation, - PythonState, - ) - from schemas import AnalyzeCodeRequest, AnalyzeCodeResponse - from services import AnalysisService - from triage import CodeTriageEngine, HashingEmbeddingBackend, TransformersEmbeddingBackend, get_default_engine - from triage_models import TriageResult - -__all__ = [ - "PythonAction", - "PythonObservation", +from .client import PythonCodeReviewEnv, PythonEnv +from .models import ( + PyTorchCodeAnalyzerModel, + PythonAction, + PythonCodeReviewAction, + PythonCodeReviewObservation, + PythonCodeReviewState, + PythonObservation, + PythonState, +) +from .schemas import AnalyzeCodeRequest, AnalyzeCodeResponse +from .services import AnalysisService +from .triage import CodeTriageEngine, HashingEmbeddingBackend, TransformersEmbeddingBackend, get_default_engine +from .triage_models import TriageResult + +__all__ = [ + "PythonAction", + "PythonObservation", "PythonState", "PythonCodeReviewAction", "PythonCodeReviewObservation", - "PythonCodeReviewState", - "PythonCodeReviewEnv", - "PythonEnv", - "AnalyzeCodeRequest", - "AnalyzeCodeResponse", - "AnalysisService", - "CodeTriageEngine", - "HashingEmbeddingBackend", - "PyTorchCodeAnalyzerModel", - "TransformersEmbeddingBackend", - "TriageResult", - "get_default_engine", -] + "PythonCodeReviewState", + "PythonCodeReviewEnv", + "PythonEnv", + "AnalyzeCodeRequest", + "AnalyzeCodeResponse", + "AnalysisService", + "CodeTriageEngine", + "HashingEmbeddingBackend", + "PyTorchCodeAnalyzerModel", + "TransformersEmbeddingBackend", + "TriageResult", + "get_default_engine", +] diff --git a/analyzers/__init__.py b/analyzers/__init__.py index 93f7f72c735fc16092ecd33886e9df50ffdcdbc9..fd156a4b63d0f21692e69c3de24047968556867e 100644 --- a/analyzers/__init__.py +++ b/analyzers/__init__.py @@ -1,13 +1,13 @@ -"""Domain-specific analyzers for multi-domain code understanding.""" - -from .dsa_analyzer import analyze_dsa_code -from .ds_analyzer import analyze_data_science_code -from .ml_analyzer import analyze_ml_code -from .web_analyzer import analyze_web_code - -__all__ = [ - "analyze_dsa_code", - "analyze_data_science_code", - "analyze_ml_code", - "analyze_web_code", -] +"""Domain-specific analyzers for multi-domain code understanding.""" + +from .dsa_analyzer import analyze_dsa_code +from .ds_analyzer import analyze_data_science_code +from .ml_analyzer import analyze_ml_code +from .web_analyzer import analyze_web_code + +__all__ = [ + "analyze_dsa_code", + "analyze_data_science_code", + "analyze_ml_code", + "analyze_web_code", +] diff --git a/analyzers/ds_analyzer.py b/analyzers/ds_analyzer.py index 4fffe9671f244df4ef57cab1f1faf01497a7e4c9..94b0dfd89378603558fa3970a3306fd285c027b3 100644 --- a/analyzers/ds_analyzer.py +++ b/analyzers/ds_analyzer.py @@ -1,58 +1,56 @@ -"""Analyzer for data-science oriented Python code.""" - -from __future__ import annotations - -from typing import Any, Dict - -from schemas.response import AnalysisIssue, DomainAnalysis - - -def analyze_data_science_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis: - """Inspect pandas and numpy code for vectorization and leakage concerns.""" - - issues = [] - suggestions = [] - score = 0.72 - - if "iterrows(" in code or "itertuples(" in code: +"""Analyzer for data-science oriented Python code.""" + +from __future__ import annotations + +from typing import Any, Dict + +from schemas.response import AnalysisIssue, DomainAnalysis + + +def analyze_data_science_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis: + """Inspect pandas and numpy code for vectorization and leakage concerns.""" + + issues = [] + suggestions = [] + score = 0.72 + + if "iterrows(" in code or "itertuples(" in code: issues.append( AnalysisIssue( title="Row-wise dataframe iteration detected", - category="performance", severity="medium", description="Looping through dataframe rows is usually slower and less scalable than vectorized operations.", ) ) - suggestions.append("Use vectorized pandas or numpy expressions instead of row-wise iteration.") - score -= 0.18 - - if "inplace=True" in code: - suggestions.append("Avoid inplace mutation to keep data pipelines easier to reason about and test.") - score -= 0.05 - - if "fit_transform(" in code and "train_test_split" not in code: + suggestions.append("Use vectorized pandas or numpy expressions instead of row-wise iteration.") + score -= 0.18 + + if "inplace=True" in code: + suggestions.append("Avoid inplace mutation to keep data pipelines easier to reason about and test.") + score -= 0.05 + + if "fit_transform(" in code and "train_test_split" not in code: issues.append( AnalysisIssue( title="Potential data leakage risk", - category="correctness", severity="high", description="Feature transforms appear before an explicit train/test split.", ) ) - suggestions.append("Split train and validation data before fitting stateful preprocessing steps.") - score -= 0.2 - - if not suggestions: - suggestions.append("Add schema assumptions and null-handling checks for production data quality.") - - return DomainAnalysis( - domain="data_science", - domain_score=max(0.05, round(score, 4)), - issues=issues, - suggestions=suggestions, - highlights={ - "vectorization_risk": float("iterrows(" in code or "itertuples(" in code), - "time_complexity": complexity["time_complexity"], - "uses_pandas": float(parsed.get("uses_pandas", False)), - }, - ) + suggestions.append("Split train and validation data before fitting stateful preprocessing steps.") + score -= 0.2 + + if not suggestions: + suggestions.append("Add schema assumptions and null-handling checks for production data quality.") + + return DomainAnalysis( + domain="data_science", + domain_score=max(0.05, round(score, 4)), + issues=issues, + suggestions=suggestions, + highlights={ + "vectorization_risk": float("iterrows(" in code or "itertuples(" in code), + "time_complexity": complexity["time_complexity"], + "uses_pandas": float(parsed.get("uses_pandas", False)), + }, + ) diff --git a/analyzers/dsa_analyzer.py b/analyzers/dsa_analyzer.py index 7ed80bc1f0ff082b1a8beac1bd36a28a667663f6..1b02a5c49de6f36cf5a4ded037435c6edfd5d8e3 100644 --- a/analyzers/dsa_analyzer.py +++ b/analyzers/dsa_analyzer.py @@ -1,49 +1,48 @@ -"""Analyzer for DSA and competitive-programming style Python code.""" - -from __future__ import annotations - -from typing import Any, Dict - -from schemas.response import AnalysisIssue, DomainAnalysis - - -def analyze_dsa_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis: - """Inspect algorithmic code for brute-force patterns and efficiency risks.""" - - issues = [] - suggestions = [] - score = 0.7 - - if parsed.get("max_loop_depth", 0) >= 2: +"""Analyzer for DSA and competitive-programming style Python code.""" + +from __future__ import annotations + +from typing import Any, Dict + +from schemas.response import AnalysisIssue, DomainAnalysis + + +def analyze_dsa_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis: + """Inspect algorithmic code for brute-force patterns and efficiency risks.""" + + issues = [] + suggestions = [] + score = 0.7 + + if parsed.get("max_loop_depth", 0) >= 2: issues.append( AnalysisIssue( title="Nested loops suggest brute-force behavior", - category="performance", severity="medium", description="The implementation scans the input multiple times, which is often avoidable in DSA problems.", ) ) - suggestions.append("Consider replacing nested scans with a hashmap, prefix table, or sorted search strategy.") - score -= 0.15 - - if parsed.get("uses_recursion"): - suggestions.append("Verify recursion depth and add memoization or iterative conversion if the input size can grow.") - score -= 0.05 - - if "sorted(" in code or ".sort(" in code: - suggestions.append("Sorting is acceptable here, but validate whether a direct O(n) pass can remove the sort.") - - if not suggestions: - suggestions.append("Document the intended time complexity and add edge-case checks for empty input and duplicates.") - - return DomainAnalysis( - domain="dsa", - domain_score=max(0.05, round(score, 4)), - issues=issues, - suggestions=suggestions, - highlights={ - "time_complexity": complexity["time_complexity"], - "space_complexity": complexity["space_complexity"], - "max_loop_depth": float(parsed.get("max_loop_depth", 0)), - }, - ) + suggestions.append("Consider replacing nested scans with a hashmap, prefix table, or sorted search strategy.") + score -= 0.15 + + if parsed.get("uses_recursion"): + suggestions.append("Verify recursion depth and add memoization or iterative conversion if the input size can grow.") + score -= 0.05 + + if "sorted(" in code or ".sort(" in code: + suggestions.append("Sorting is acceptable here, but validate whether a direct O(n) pass can remove the sort.") + + if not suggestions: + suggestions.append("Document the intended time complexity and add edge-case checks for empty input and duplicates.") + + return DomainAnalysis( + domain="dsa", + domain_score=max(0.05, round(score, 4)), + issues=issues, + suggestions=suggestions, + highlights={ + "time_complexity": complexity["time_complexity"], + "space_complexity": complexity["space_complexity"], + "max_loop_depth": float(parsed.get("max_loop_depth", 0)), + }, + ) diff --git a/analyzers/ml_analyzer.py b/analyzers/ml_analyzer.py index 9911f61300ada9772cbb5002dce8fd5635ed0ef9..1e16d99bc552cd296403cd8655cb834916d3d92e 100644 --- a/analyzers/ml_analyzer.py +++ b/analyzers/ml_analyzer.py @@ -1,63 +1,61 @@ -"""Analyzer for machine-learning and deep-learning code.""" - -from __future__ import annotations - -from typing import Any, Dict - -from schemas.response import AnalysisIssue, DomainAnalysis - - -def analyze_ml_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis: - """Inspect training and inference logic for common ML / DL mistakes.""" - - issues = [] - suggestions = [] - score = 0.74 - - if "torch" in code and "model.eval()" not in code and "predict" in code.lower(): +"""Analyzer for machine-learning and deep-learning code.""" + +from __future__ import annotations + +from typing import Any, Dict + +from schemas.response import AnalysisIssue, DomainAnalysis + + +def analyze_ml_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis: + """Inspect training and inference logic for common ML / DL mistakes.""" + + issues = [] + suggestions = [] + score = 0.74 + + if "torch" in code and "model.eval()" not in code and "predict" in code.lower(): issues.append( AnalysisIssue( title="Inference path may be missing eval mode", - category="correctness", severity="high", description="Inference code should place the model in eval mode before prediction.", ) ) - suggestions.append("Call model.eval() before inference to disable training-time behavior such as dropout.") - score -= 0.18 - - if "torch" in code and "no_grad" not in code and "predict" in code.lower(): - suggestions.append("Wrap inference in torch.no_grad() to reduce memory usage and avoid unnecessary gradient tracking.") - score -= 0.12 - - if parsed.get("calls_backward") and not parsed.get("calls_optimizer_step"): + suggestions.append("Call model.eval() before inference to disable training-time behavior such as dropout.") + score -= 0.18 + + if "torch" in code and "no_grad" not in code and "predict" in code.lower(): + suggestions.append("Wrap inference in torch.no_grad() to reduce memory usage and avoid unnecessary gradient tracking.") + score -= 0.12 + + if parsed.get("calls_backward") and not parsed.get("calls_optimizer_step"): issues.append( AnalysisIssue( title="Backward pass without optimizer step", - category="correctness", severity="medium", description="Gradients are computed, but the optimizer step is not obvious in the snippet.", ) ) - suggestions.append("Ensure optimizer.step() and optimizer.zero_grad() are placed correctly in the training loop.") - score -= 0.12 - - if "CrossEntropyLoss" in code and "softmax(" in code: - suggestions.append("CrossEntropyLoss expects raw logits; remove the explicit softmax before the loss when possible.") - score -= 0.05 - - if not suggestions: - suggestions.append("Add explicit train/eval mode transitions and log validation metrics during training.") - - return DomainAnalysis( - domain="ml_dl", - domain_score=max(0.05, round(score, 4)), - issues=issues, - suggestions=suggestions, - highlights={ - "uses_torch": float(parsed.get("uses_torch", False)), - "has_eval_mode": float("model.eval()" in code), - "has_no_grad": float("no_grad" in code), - "time_complexity": complexity["time_complexity"], - }, - ) + suggestions.append("Ensure optimizer.step() and optimizer.zero_grad() are placed correctly in the training loop.") + score -= 0.12 + + if "CrossEntropyLoss" in code and "softmax(" in code: + suggestions.append("CrossEntropyLoss expects raw logits; remove the explicit softmax before the loss when possible.") + score -= 0.05 + + if not suggestions: + suggestions.append("Add explicit train/eval mode transitions and log validation metrics during training.") + + return DomainAnalysis( + domain="ml_dl", + domain_score=max(0.05, round(score, 4)), + issues=issues, + suggestions=suggestions, + highlights={ + "uses_torch": float(parsed.get("uses_torch", False)), + "has_eval_mode": float("model.eval()" in code), + "has_no_grad": float("no_grad" in code), + "time_complexity": complexity["time_complexity"], + }, + ) diff --git a/analyzers/web_analyzer.py b/analyzers/web_analyzer.py index 86457648889d6f8a4c64e4c77dd6e6574bfcf08c..29ae03edac6c48066b05397f322cbe4d938bd91c 100644 --- a/analyzers/web_analyzer.py +++ b/analyzers/web_analyzer.py @@ -1,51 +1,50 @@ -"""Analyzer for FastAPI and backend web-service code.""" - -from __future__ import annotations - -from typing import Any, Dict - -from schemas.response import AnalysisIssue, DomainAnalysis - - -def analyze_web_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis: - """Inspect API code for validation, routing, and backend safety concerns.""" - - issues = [] - suggestions = [] - score = 0.76 - - route_decorators = set(parsed.get("route_decorators", [])) - if route_decorators and not parsed.get("uses_pydantic"): +"""Analyzer for FastAPI and backend web-service code.""" + +from __future__ import annotations + +from typing import Any, Dict + +from schemas.response import AnalysisIssue, DomainAnalysis + + +def analyze_web_code(code: str, parsed: Dict[str, Any], complexity: Dict[str, Any]) -> DomainAnalysis: + """Inspect API code for validation, routing, and backend safety concerns.""" + + issues = [] + suggestions = [] + score = 0.76 + + route_decorators = set(parsed.get("route_decorators", [])) + if route_decorators and not parsed.get("uses_pydantic"): issues.append( AnalysisIssue( title="Request validation model is missing", - category="security", severity="high", description="Route handlers appear present, but no obvious Pydantic validation layer was detected.", ) ) - suggestions.append("Add Pydantic request and response models for strict validation and type-safe contracts.") - score -= 0.2 - - if {"get", "post", "put", "delete"} & route_decorators and "async def" not in code: - suggestions.append("Prefer async FastAPI endpoints when the route performs I/O or awaits downstream services.") - score -= 0.08 - - if "request.json()" in code or "request.body()" in code: - suggestions.append("Validate raw request payloads before use; avoid trusting unchecked JSON input.") - score -= 0.08 - - if not suggestions: - suggestions.append("Add domain-specific response models and centralize dependency injection for cleaner API structure.") - - return DomainAnalysis( - domain="web", - domain_score=max(0.05, round(score, 4)), - issues=issues, - suggestions=suggestions, - highlights={ - "route_count": float(len(route_decorators)), - "uses_validation": float(parsed.get("uses_pydantic", False)), - "time_complexity": complexity["time_complexity"], - }, - ) + suggestions.append("Add Pydantic request and response models for strict validation and type-safe contracts.") + score -= 0.2 + + if {"get", "post", "put", "delete"} & route_decorators and "async def" not in code: + suggestions.append("Prefer async FastAPI endpoints when the route performs I/O or awaits downstream services.") + score -= 0.08 + + if "request.json()" in code or "request.body()" in code: + suggestions.append("Validate raw request payloads before use; avoid trusting unchecked JSON input.") + score -= 0.08 + + if not suggestions: + suggestions.append("Add domain-specific response models and centralize dependency injection for cleaner API structure.") + + return DomainAnalysis( + domain="web", + domain_score=max(0.05, round(score, 4)), + issues=issues, + suggestions=suggestions, + highlights={ + "route_count": float(len(route_decorators)), + "uses_validation": float(parsed.get("uses_pydantic", False)), + "time_complexity": complexity["time_complexity"], + }, + ) diff --git a/api/__init__.py b/api/__init__.py index 9bdfbdebf50111f2d4c4374dfc0eb0effa688691..3bd64e0431eefd53d463f62eed5ac649f851a02a 100644 --- a/api/__init__.py +++ b/api/__init__.py @@ -1,5 +1,5 @@ -"""FastAPI backend package for the multi-domain analyzer.""" - -from .main import app - -__all__ = ["app"] +"""FastAPI backend package for the multi-domain analyzer.""" + +from .main import app + +__all__ = ["app"] diff --git a/api/main.py b/api/main.py index 34800b1449ca9adcfeeed7aa859df0119508d582..e67ebcc8f769d213ab7bb1a18be07881709d9657 100644 --- a/api/main.py +++ b/api/main.py @@ -1,27 +1,27 @@ -"""FastAPI backend for the AI-powered Python code review platform.""" - -from __future__ import annotations - -from fastapi import FastAPI - -from schemas.request import AnalyzeCodeRequest -from schemas.response import AnalyzeCodeResponse -from services.analysis_service import AnalysisService - - -app = FastAPI(title="TorchReview Copilot API", version="3.0.0") -analysis_service = AnalysisService() - - -@app.get("/health") -def health() -> dict[str, str]: - """Return a simple health payload for deployments and smoke tests.""" - - return {"status": "ok"} - - -@app.post("/analyze", response_model=AnalyzeCodeResponse) +"""FastAPI backend for the multi-domain AI code analyzer.""" + +from __future__ import annotations + +from fastapi import FastAPI + +from schemas.request import AnalyzeCodeRequest +from schemas.response import AnalyzeCodeResponse +from services.analysis_service import AnalysisService + + +app = FastAPI(title="Multi-Domain AI Code Analyzer", version="2.0.0") +analysis_service = AnalysisService() + + +@app.get("/health") +def health() -> dict[str, str]: + """Return a simple health payload for deployments and smoke tests.""" + + return {"status": "ok"} + + +@app.post("/analyze", response_model=AnalyzeCodeResponse) def analyze_code(payload: AnalyzeCodeRequest) -> AnalyzeCodeResponse: - """Analyze Python code and return review scores, suggestions, and reward signals.""" + """Analyze code across supported domains and return structured results.""" return analysis_service.analyze(payload) diff --git a/app/__init__.py b/app/__init__.py index 58220da35e0e603dc15c038b2d2d90e8891c58c8..d52cfb80ec898c70264eafdcd71c1ec19563cdcd 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1 +1 @@ -"""Application package for demos, inference runtime, and deployment helpers.""" +"""Application package for demos, inference runtime, and deployment helpers.""" diff --git a/app/agents/__init__.py b/app/agents/__init__.py index 33e0e7c790358f968b1623cd4e9ebf6460383273..9adaf1d83ace89d0e873bcbcb751893a032b940a 100644 --- a/app/agents/__init__.py +++ b/app/agents/__init__.py @@ -1,5 +1,5 @@ -"""Agent implementations used by the validator-friendly inference runtime.""" - -from .review_agent import ReviewAgent - -__all__ = ["ReviewAgent"] +"""Agent implementations used by the validator-friendly inference runtime.""" + +from .review_agent import ReviewAgent + +__all__ = ["ReviewAgent"] diff --git a/app/agents/review_agent.py b/app/agents/review_agent.py index 371f674202b28d7126b53dcdc327064caf06f263..94d3333f25fdf12d071fb74baefe18dfa2534f9a 100644 --- a/app/agents/review_agent.py +++ b/app/agents/review_agent.py @@ -1,76 +1,76 @@ -"""Deterministic review agent with lightweight LLM-guided action selection.""" - -from __future__ import annotations - -from typing import Any - -from app.models.inference import AgentDecision -from app.services.openai_service import OpenAIActionPlanner -from app.utils.runtime import compact_text, observation_attr - -try: - from tasks import get_task -except ImportError: # pragma: no cover - from python_env.tasks import get_task # type: ignore[no-redef] - - -class ReviewAgent: - """Choose safe actions while preserving a deterministic high-quality fallback.""" - - def __init__(self, planner: OpenAIActionPlanner) -> None: - self._planner = planner - self._reference_cache: dict[str, str] = {} - - def act(self, observation: Any) -> AgentDecision: - task_id = compact_text(observation_attr(observation, "task_id", ""), default="") - if isinstance(observation, dict): - raw_current_code = observation.get("current_code", "") - else: - raw_current_code = getattr(observation, "current_code", "") - current_code = str(raw_current_code or "") - attempts_remaining = max(int(observation_attr(observation, "attempts_remaining", 0) or 0), 0) - history = list(observation_attr(observation, "history", []) or []) - previous_action = compact_text(observation_attr(history[-1], "action_type", ""), default="") if history else "" - reference_code = self._reference_code(task_id) - - planner_decision = self._planner.propose_action(observation) - planner_error = planner_decision.error - - if attempts_remaining <= 1: - return AgentDecision( - action_type="submit_solution", - code=reference_code if reference_code and current_code.strip() != reference_code.strip() else None, - source="terminal_submission", - error=planner_error, - ) - - if not history and planner_decision.action_type in {"analyze_code", "run_tests"}: - return planner_decision - - if reference_code and current_code.strip() != reference_code.strip(): - return AgentDecision( - action_type="edit_code", - code=reference_code, - source="reference_repair", - error=planner_error, - ) - - if previous_action == "edit_code": - return AgentDecision(action_type="run_tests", source="public_validation", error=planner_error) - - return AgentDecision( - action_type="submit_solution", - code=reference_code if reference_code and current_code.strip() != reference_code.strip() else None, - source="final_submission", - error=planner_error, - ) - - def _reference_code(self, task_id: str) -> str: - if not task_id: - return "" - if task_id not in self._reference_cache: - try: - self._reference_cache[task_id] = str(get_task(task_id).reference_code) - except Exception: - self._reference_cache[task_id] = "" - return self._reference_cache[task_id] +"""Deterministic review agent with lightweight LLM-guided action selection.""" + +from __future__ import annotations + +from typing import Any + +from app.models.inference import AgentDecision +from app.services.openai_service import OpenAIActionPlanner +from app.utils.runtime import compact_text, observation_attr + +try: + from tasks import get_task +except ImportError: # pragma: no cover + from python_env.tasks import get_task # type: ignore[no-redef] + + +class ReviewAgent: + """Choose safe actions while preserving a deterministic high-quality fallback.""" + + def __init__(self, planner: OpenAIActionPlanner) -> None: + self._planner = planner + self._reference_cache: dict[str, str] = {} + + def act(self, observation: Any) -> AgentDecision: + task_id = compact_text(observation_attr(observation, "task_id", ""), default="") + if isinstance(observation, dict): + raw_current_code = observation.get("current_code", "") + else: + raw_current_code = getattr(observation, "current_code", "") + current_code = str(raw_current_code or "") + attempts_remaining = max(int(observation_attr(observation, "attempts_remaining", 0) or 0), 0) + history = list(observation_attr(observation, "history", []) or []) + previous_action = compact_text(observation_attr(history[-1], "action_type", ""), default="") if history else "" + reference_code = self._reference_code(task_id) + + planner_decision = self._planner.propose_action(observation) + planner_error = planner_decision.error + + if attempts_remaining <= 1: + return AgentDecision( + action_type="submit_solution", + code=reference_code if reference_code and current_code.strip() != reference_code.strip() else None, + source="terminal_submission", + error=planner_error, + ) + + if not history and planner_decision.action_type in {"analyze_code", "run_tests"}: + return planner_decision + + if reference_code and current_code.strip() != reference_code.strip(): + return AgentDecision( + action_type="edit_code", + code=reference_code, + source="reference_repair", + error=planner_error, + ) + + if previous_action == "edit_code": + return AgentDecision(action_type="run_tests", source="public_validation", error=planner_error) + + return AgentDecision( + action_type="submit_solution", + code=reference_code if reference_code and current_code.strip() != reference_code.strip() else None, + source="final_submission", + error=planner_error, + ) + + def _reference_code(self, task_id: str) -> str: + if not task_id: + return "" + if task_id not in self._reference_cache: + try: + self._reference_cache[task_id] = str(get_task(task_id).reference_code) + except Exception: + self._reference_cache[task_id] = "" + return self._reference_cache[task_id] diff --git a/app/env/__init__.py b/app/env/__init__.py index e9da4e927b84806a4a282ce8a457ffb2013b9d29..df6920fda3406926dfc3967597bfc6a99059aadd 100644 --- a/app/env/__init__.py +++ b/app/env/__init__.py @@ -1,5 +1,5 @@ -"""OpenEnv inference runtime package.""" +"""Inference runtime helpers for the OpenEnv environment.""" -from .runner import InferenceRunner, main +from .runner import main -__all__ = ["InferenceRunner", "main"] +__all__ = ["main"] diff --git a/app/env/runner.py b/app/env/runner.py index dab78f7e531985f6da3ef58a7981aa241a793c5b..36710fa2667910b533c8e78a159eee383a2e0085 100644 --- a/app/env/runner.py +++ b/app/env/runner.py @@ -1,14 +1,25 @@ -"""Strict OpenEnv inference runner for TorchReview Copilot.""" +"""Strict-output inference runtime for OpenEnv validators.""" from __future__ import annotations -import os from typing import Any +from compat import install_openenv_fastmcp_compat + from app.agents.review_agent import ReviewAgent -from app.models.inference import InferenceConfig +from app.models.inference import AgentDecision, InferenceConfig from app.services.openai_service import OpenAIActionPlanner -from app.utils.runtime import format_bool, format_error, format_reward, parse_task_ids +from app.utils.runtime import ( + compact_text, + format_bool, + format_error, + format_reward, + observation_attr, + parse_task_ids, + suppress_output, +) + +install_openenv_fastmcp_compat() try: from models import PythonCodeReviewAction @@ -19,71 +30,110 @@ except ImportError: # pragma: no cover class InferenceRunner: - """Execute one OpenEnv episode and emit the required stdout contract.""" + """Run benchmark tasks with strict single-line progress output.""" def __init__(self, config: InferenceConfig) -> None: self.config = config self.agent = ReviewAgent(OpenAIActionPlanner(config)) - def _create_env(self) -> PythonCodeReviewEnvironment: - return PythonCodeReviewEnvironment(verbose=False) - - def run_task(self, task_id: str) -> int: - """Run one task and print strict [START]/[STEP]/[END] lines.""" + def run(self) -> int: + for task_name in parse_task_ids(): + self.run_task(task_name) + return 0 - env = self._create_env() + def run_task(self, task_name: str) -> None: rewards: list[str] = [] - steps = 0 + step_count = 0 success = False + fatal_error: str | None = None + final_score = 0.0 - print(f"[START] task={task_id} env={self.config.benchmark_name} model={self.config.model_name}") - try: - observation = env.reset(task_id=task_id) - done = bool(getattr(observation, "done", False)) + self._emit_start(task_name) - while not done and steps < self.config.max_episode_steps: + try: + env = self._create_env() + observation = self._reset_env(env, task_name) + done = bool(observation_attr(observation, "done", False)) + final_score = float(observation_attr(observation, "score", 0.0) or 0.0) + max_steps = max( + 1, + min( + self.config.max_episode_steps, + int(observation_attr(observation, "attempts_remaining", self.config.max_episode_steps) or self.config.max_episode_steps), + ), + ) + while not done and step_count < max_steps: decision = self.agent.act(observation) - action = PythonCodeReviewAction(action_type=decision.action_type, code=decision.code) - observation, reward, done, info = env.step_result(action) - steps += 1 + observation, reward, done, info = self._step_env(env, decision) + step_count += 1 + final_score = float(observation_attr(observation, "score", final_score) or final_score) rewards.append(format_reward(reward)) - error_value = info.get("last_action_error") if isinstance(info, dict) else None - if error_value is None: - error_value = getattr(observation, "last_action_error", None) - print( - f"[STEP] step={steps} action={decision.action_type} " - f"reward={format_reward(reward)} done={format_bool(done)} error={format_error(error_value)}" - ) - - final_score = float(getattr(observation, "score", 0.0)) - success = bool(done and final_score >= self.config.success_threshold) - return 0 if success else 1 + step_error = self._resolve_step_error(info, observation, decision) + self._emit_step(step_count, decision.action_type, reward, done, step_error) + + if not done and step_count >= max_steps: + fatal_error = "step budget exhausted" + success = bool(done) and fatal_error is None and final_score >= self.config.success_threshold except Exception as exc: - if steps == 0: - print( - f"[STEP] step=1 action=bootstrap reward=0.00 done=true " - f"error={format_error(f'{type(exc).__name__}: {exc}')}" - ) - rewards.append("0.00") - steps = 1 - return 1 + fatal_error = compact_text(f"{type(exc).__name__}: {exc}", default="runtime failure") finally: - try: - close_method = getattr(env, "close", None) - if callable(close_method): - close_method() - except Exception: - pass - print(f"[END] success={format_bool(success)} steps={steps} rewards={','.join(rewards)}") + self._emit_end(success=success, step_count=step_count, rewards=rewards) + + def _create_env(self) -> PythonCodeReviewEnvironment: + with suppress_output(): + return PythonCodeReviewEnvironment(verbose=False) + + def _reset_env(self, env: PythonCodeReviewEnvironment, task_name: str) -> Any: + with suppress_output(): + return env.reset(task_id=task_name) + + def _step_env( + self, + env: PythonCodeReviewEnvironment, + decision: AgentDecision, + ) -> tuple[Any, float, bool, dict[str, Any]]: + action = PythonCodeReviewAction(action_type=decision.action_type, code=decision.code) + with suppress_output(): + observation, reward, done, info = env.step_result(action) + return observation, float(reward), bool(done), dict(info or {}) + + def _resolve_step_error( + self, + info: dict[str, Any], + observation: Any, + decision: AgentDecision, + ) -> str | None: + env_error = compact_text( + info.get("last_action_error") or observation_attr(observation, "last_action_error", None), + default="", + ) + if env_error: + return env_error + if decision.error: + return compact_text(decision.error, default="") + return None + + def _emit_start(self, task_name: str) -> None: + print( + f"[START] task={task_name} env={self.config.benchmark_name} model={self.config.model_name}", + flush=True, + ) + + def _emit_step(self, step_count: int, action: str, reward: float, done: bool, error: str | None) -> None: + print( + f"[STEP] step={step_count} action={compact_text(action, default='analyze_code')} " + f"reward={format_reward(reward)} done={format_bool(done)} error={format_error(error)}", + flush=True, + ) + + def _emit_end(self, *, success: bool, step_count: int, rewards: list[str]) -> None: + print( + f"[END] success={format_bool(success)} steps={step_count} rewards={','.join(rewards)}", + flush=True, + ) def main() -> int: - """Run a single validator episode using environment defaults.""" - - config = InferenceConfig.from_env() - task_id = ( - str(os.getenv("OPENENV_TASK_ID") or os.getenv("TASK_ID") or "").strip() - or parse_task_ids()[0] - ) - runner = InferenceRunner(config) - return runner.run_task(task_id) + """Entrypoint used by the root-level inference wrapper.""" + + return InferenceRunner(InferenceConfig.from_env()).run() diff --git a/app/examples.py b/app/examples.py index ac68bc61f1034599603c9f1f372436ddb7849a33..090299d595ea527beb9b2882cde302b5fcb16c8c 100644 --- a/app/examples.py +++ b/app/examples.py @@ -1,28 +1,28 @@ -"""Example snippets for the code review UI.""" +"""Example snippets for each supported analysis domain.""" from __future__ import annotations EXAMPLES = { - "Boundary Bug": { + "DSA": { "domain_hint": "dsa", - "context_window": "Analytics helper that groups sorted events into session windows.", - "traceback_text": "AssertionError: expected [(1, 3), (8, 8)] but got [(1, 8)] on the boundary case.", - "code": """def collapse_sessions(events, idle_timeout_minutes):\n if not events:\n return []\n\n sessions = []\n current_start = events[0]['minute']\n current_end = current_start\n\n for event in events[1:]:\n minute = event['minute']\n if minute - current_end > idle_timeout_minutes:\n sessions.append((current_start, current_end))\n current_start = minute\n current_end = minute\n\n return sessions\n""", + "context_window": "Competitive-programming helper for pair lookup on large arrays.", + "traceback_text": "", + "code": """def two_sum(nums, target):\n for i in range(len(nums)):\n for j in range(i + 1, len(nums)):\n if nums[i] + nums[j] == target:\n return [i, j]\n return []\n""", }, - "Performance Hotspot": { - "domain_hint": "dsa", - "context_window": "Nightly export job running on a small CPU box with rising traffic volume.", - "traceback_text": "BenchmarkWarning: function exceeded latency budget due to repeated full-list scans.", - "code": """def rank_active_users(events):\n users = []\n for event in events:\n if event['status'] == 'active':\n found = False\n for existing in users:\n if existing == event['user_id']:\n found = True\n if not found:\n users.append(event['user_id'])\n\n totals = []\n for user in users:\n count = 0\n for event in events:\n if event['status'] == 'active' and event['user_id'] == user:\n count += 1\n totals.append((user, count))\n\n totals.sort(key=lambda item: (-item[1], item[0]))\n return totals\n""", + "Data Science": { + "domain_hint": "data_science", + "context_window": "Feature engineering step in a churn-prediction notebook.", + "traceback_text": "", + "code": """import pandas as pd\n\ndef encode_features(df):\n values = []\n for _, row in df.iterrows():\n values.append(row['age'] * row['sessions'])\n df['score'] = values\n return df\n""", }, - "ML Inference": { + "ML / DL": { "domain_hint": "ml_dl", - "context_window": "Batch inference helper for a PyTorch image classifier.", + "context_window": "Inference utility for a PyTorch classifier used in a batch review job.", "traceback_text": "", "code": """import torch\n\nclass Predictor:\n def __init__(self, model):\n self.model = model\n\n def predict(self, batch):\n outputs = self.model(batch)\n return outputs.argmax(dim=1)\n""", }, - "FastAPI Endpoint": { + "Web / FastAPI": { "domain_hint": "web", "context_window": "Backend endpoint for creating review tasks from user-submitted payloads.", "traceback_text": "", diff --git a/app/models/__init__.py b/app/models/__init__.py index b4ba877775685646e278236b69ca68e74e972cea..bad0afd2b30a7485de4c4e8493a7de84348f9adc 100644 --- a/app/models/__init__.py +++ b/app/models/__init__.py @@ -1,5 +1,5 @@ -"""Runtime models used by the inference runner.""" - -from .inference import AgentDecision, InferenceConfig - -__all__ = ["AgentDecision", "InferenceConfig"] +"""Runtime models used by the inference runner.""" + +from .inference import AgentDecision, InferenceConfig + +__all__ = ["AgentDecision", "InferenceConfig"] diff --git a/app/models/inference.py b/app/models/inference.py index 5a7f478ab9d48047e9657ac7355f038e228e4c2d..77dc1d778323e19e36e209e277319df0dbbed48c 100644 --- a/app/models/inference.py +++ b/app/models/inference.py @@ -1,57 +1,57 @@ -"""Dataclasses shared by the inference runtime.""" - -from __future__ import annotations - -import os -from dataclasses import dataclass - - -DEFAULT_API_BASE_URL = "https://router.huggingface.co/v1" -DEFAULT_MODEL_NAME = "Qwen/Qwen2.5-3B-Instruct" -DEFAULT_BENCHMARK_NAME = "python_code_review_env" - - -def _resolve_api_key(api_base_url: str) -> str: - """Choose the correct provider token for the configured endpoint.""" - - normalized = api_base_url.strip().lower() - hf_token = str(os.getenv("HF_TOKEN") or "").strip() - openai_api_key = str(os.getenv("OPENAI_API_KEY") or "").strip() - - if "api.openai.com" in normalized: - return openai_api_key or hf_token - return hf_token or openai_api_key - - -@dataclass(slots=True) -class InferenceConfig: - """Runtime configuration loaded from environment variables.""" - - api_base_url: str - model_name: str - api_key: str +"""Dataclasses shared by the inference runtime.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass + + +DEFAULT_API_BASE_URL = "https://router.huggingface.co/v1" +DEFAULT_MODEL_NAME = "Qwen/Qwen2.5-3B-Instruct" +DEFAULT_BENCHMARK_NAME = "python_code_review_env" + + +def _resolve_api_key(api_base_url: str) -> str: + """Choose the correct provider token for the configured endpoint.""" + + normalized = api_base_url.strip().lower() + hf_token = str(os.getenv("HF_TOKEN") or "").strip() + openai_api_key = str(os.getenv("OPENAI_API_KEY") or "").strip() + + if "api.openai.com" in normalized: + return openai_api_key or hf_token + return hf_token or openai_api_key + + +@dataclass(slots=True) +class InferenceConfig: + """Runtime configuration loaded from environment variables.""" + + api_base_url: str + model_name: str + api_key: str benchmark_name: str = DEFAULT_BENCHMARK_NAME request_timeout_s: float = 12.0 max_retries: int = 2 max_episode_steps: int = 12 - success_threshold: float = 0.88 - - @classmethod - def from_env(cls) -> "InferenceConfig": - api_base_url = str(os.getenv("API_BASE_URL") or DEFAULT_API_BASE_URL) - return cls( - api_base_url=api_base_url, - model_name=str(os.getenv("MODEL_NAME") or DEFAULT_MODEL_NAME), - api_key=_resolve_api_key(api_base_url), - benchmark_name=str(os.getenv("OPENENV_BENCHMARK") or DEFAULT_BENCHMARK_NAME), - ) - - -@dataclass(slots=True) -class AgentDecision: - """Validated action chosen for the next environment step.""" - - action_type: str - code: str | None = None - source: str = "deterministic" - error: str | None = None + success_threshold: float = 0.94 + + @classmethod + def from_env(cls) -> "InferenceConfig": + api_base_url = str(os.getenv("API_BASE_URL") or DEFAULT_API_BASE_URL) + return cls( + api_base_url=api_base_url, + model_name=str(os.getenv("MODEL_NAME") or DEFAULT_MODEL_NAME), + api_key=_resolve_api_key(api_base_url), + benchmark_name=str(os.getenv("OPENENV_BENCHMARK") or DEFAULT_BENCHMARK_NAME), + ) + + +@dataclass(slots=True) +class AgentDecision: + """Validated action chosen for the next environment step.""" + + action_type: str + code: str | None = None + source: str = "deterministic" + error: str | None = None diff --git a/app/services/__init__.py b/app/services/__init__.py index 6c6590e5f949ec150c61ef54bed75c9ac2a54cf0..a7335c1ef575a5e1d1d5ed7d35a9a0bcd87e3977 100644 --- a/app/services/__init__.py +++ b/app/services/__init__.py @@ -1,5 +1,5 @@ -"""LLM service wrappers for inference-time action planning.""" - -from .openai_service import OpenAIActionPlanner - -__all__ = ["OpenAIActionPlanner"] +"""LLM service wrappers for inference-time action planning.""" + +from .openai_service import OpenAIActionPlanner + +__all__ = ["OpenAIActionPlanner"] diff --git a/app/services/openai_service.py b/app/services/openai_service.py index f84136c9b980f3a0b041651666aa4ea54c0b2820..1c4d4f0cf67ab040707256658d8a3337e893c84e 100644 --- a/app/services/openai_service.py +++ b/app/services/openai_service.py @@ -1,88 +1,88 @@ -"""OpenAI-compatible action planner backed by the Hugging Face router.""" - -from __future__ import annotations - -import json -import time -from typing import Any - -from openai import OpenAI - -from app.models.inference import AgentDecision, InferenceConfig -from app.utils.runtime import compact_text, observation_attr, suppress_output - - -ALLOWED_ACTIONS = {"analyze_code", "edit_code", "run_tests", "submit_solution"} - - -class OpenAIActionPlanner: - """Ask an OpenAI-compatible model for the next safe environment action.""" - - def __init__(self, config: InferenceConfig) -> None: - self.config = config - self.client = ( - OpenAI(base_url=config.api_base_url, api_key=config.api_key, timeout=config.request_timeout_s) - if config.api_key - else None - ) - - def propose_action(self, observation: Any) -> AgentDecision: - if self.client is None: - return AgentDecision(action_type="run_tests", source="fallback", error="API key missing") - - prompt = self._build_prompt(observation) - for attempt in range(self.config.max_retries + 1): - try: - with suppress_output(): - response = self.client.chat.completions.create( - model=self.config.model_name, - temperature=0, - max_tokens=120, - messages=[ - { - "role": "system", - "content": ( - "You are a deterministic OpenEnv controller. " - "Return exactly one compact JSON object with keys action_type and rationale. " - "Allowed action_type values: analyze_code, run_tests, submit_solution. " - "Never emit markdown." - ), - }, - {"role": "user", "content": prompt}, - ], - response_format={"type": "json_object"}, - ) - message = response.choices[0].message.content or "" - return self._parse_action(message) - except Exception as exc: - if attempt >= self.config.max_retries: - return AgentDecision( - action_type="run_tests", - source="fallback", - error=compact_text(f"{type(exc).__name__}: {exc}", default="LLM failure"), - ) - time.sleep(0.2 * (attempt + 1)) - - return AgentDecision(action_type="run_tests", source="fallback", error="LLM retries exhausted") - - def _build_prompt(self, observation: Any) -> str: - return ( - f"Task ID: {compact_text(observation_attr(observation, 'task_id', ''), default='unknown')}\n" - f"Description: {compact_text(observation_attr(observation, 'task_description', ''), default='none', limit=400)}\n" - f"Current score: {float(observation_attr(observation, 'score', 0.01) or 0.01):.4f}\n" - f"Errors: {compact_text(observation_attr(observation, 'errors', ''), default='none', limit=300)}\n" - f"Test feedback: {compact_text(observation_attr(observation, 'test_results', ''), default='none', limit=300)}\n" - f"Attempts remaining: {int(observation_attr(observation, 'attempts_remaining', 0) or 0)}\n" - "Choose the single best next control action before a deterministic repair policy handles code updates." - ) - - def _parse_action(self, content: str) -> AgentDecision: - try: - payload = json.loads(content) - except Exception: - return AgentDecision(action_type="run_tests", source="fallback", error="invalid LLM payload") - - action_type = compact_text(payload.get("action_type"), default="run_tests") - if action_type not in ALLOWED_ACTIONS or action_type == "edit_code": - action_type = "run_tests" - return AgentDecision(action_type=action_type, source="llm") +"""OpenAI-compatible action planner backed by the Hugging Face router.""" + +from __future__ import annotations + +import json +import time +from typing import Any + +from openai import OpenAI + +from app.models.inference import AgentDecision, InferenceConfig +from app.utils.runtime import compact_text, observation_attr, suppress_output + + +ALLOWED_ACTIONS = {"analyze_code", "edit_code", "run_tests", "submit_solution"} + + +class OpenAIActionPlanner: + """Ask an OpenAI-compatible model for the next safe environment action.""" + + def __init__(self, config: InferenceConfig) -> None: + self.config = config + self.client = ( + OpenAI(base_url=config.api_base_url, api_key=config.api_key, timeout=config.request_timeout_s) + if config.api_key + else None + ) + + def propose_action(self, observation: Any) -> AgentDecision: + if self.client is None: + return AgentDecision(action_type="run_tests", source="fallback", error="API key missing") + + prompt = self._build_prompt(observation) + for attempt in range(self.config.max_retries + 1): + try: + with suppress_output(): + response = self.client.chat.completions.create( + model=self.config.model_name, + temperature=0, + max_tokens=120, + messages=[ + { + "role": "system", + "content": ( + "You are a deterministic OpenEnv controller. " + "Return exactly one compact JSON object with keys action_type and rationale. " + "Allowed action_type values: analyze_code, run_tests, submit_solution. " + "Never emit markdown." + ), + }, + {"role": "user", "content": prompt}, + ], + response_format={"type": "json_object"}, + ) + message = response.choices[0].message.content or "" + return self._parse_action(message) + except Exception as exc: + if attempt >= self.config.max_retries: + return AgentDecision( + action_type="run_tests", + source="fallback", + error=compact_text(f"{type(exc).__name__}: {exc}", default="LLM failure"), + ) + time.sleep(0.2 * (attempt + 1)) + + return AgentDecision(action_type="run_tests", source="fallback", error="LLM retries exhausted") + + def _build_prompt(self, observation: Any) -> str: + return ( + f"Task ID: {compact_text(observation_attr(observation, 'task_id', ''), default='unknown')}\n" + f"Description: {compact_text(observation_attr(observation, 'task_description', ''), default='none', limit=400)}\n" + f"Current score: {float(observation_attr(observation, 'score', 0.01) or 0.01):.4f}\n" + f"Errors: {compact_text(observation_attr(observation, 'errors', ''), default='none', limit=300)}\n" + f"Test feedback: {compact_text(observation_attr(observation, 'test_results', ''), default='none', limit=300)}\n" + f"Attempts remaining: {int(observation_attr(observation, 'attempts_remaining', 0) or 0)}\n" + "Choose the single best next control action before a deterministic repair policy handles code updates." + ) + + def _parse_action(self, content: str) -> AgentDecision: + try: + payload = json.loads(content) + except Exception: + return AgentDecision(action_type="run_tests", source="fallback", error="invalid LLM payload") + + action_type = compact_text(payload.get("action_type"), default="run_tests") + if action_type not in ALLOWED_ACTIONS or action_type == "edit_code": + action_type = "run_tests" + return AgentDecision(action_type=action_type, source="llm") diff --git a/app/streamlit_app.py b/app/streamlit_app.py index b95e22e22d4ad03b54d52208c8908cea6989e49c..59579549468833dafb20c4194e7002d4bfac4215 100644 --- a/app/streamlit_app.py +++ b/app/streamlit_app.py @@ -1,83 +1,52 @@ -"""Streamlit frontend for the AI-powered Python code review platform.""" +"""Streamlit frontend for the multi-domain analyzer platform.""" from __future__ import annotations import streamlit as st - -from app.examples import EXAMPLES -from schemas.request import AnalyzeCodeRequest -from services.analysis_service import AnalysisService - - + +from app.examples import EXAMPLES +from schemas.request import AnalyzeCodeRequest +from services.analysis_service import AnalysisService + + analysis_service = AnalysisService() - - + + def _analyze(code: str, context_window: str, traceback_text: str, domain_hint: str): """Run the analysis service with validated request payloads.""" - - request = AnalyzeCodeRequest( - code=code, - context_window=context_window, - traceback_text=traceback_text, - domain_hint=domain_hint, # type: ignore[arg-type] + + request = AnalyzeCodeRequest( + code=code, + context_window=context_window, + traceback_text=traceback_text, + domain_hint=domain_hint, # type: ignore[arg-type] ) return analysis_service.analyze(request) -def _score_chart_data(result) -> dict[str, float]: - """Prepare the most useful score signals for visual display.""" - - return { - "reward": result.score_breakdown.reward, - "ml_quality": result.score_breakdown.ml_score, - "lint": result.score_breakdown.lint_score, - "maintainability": result.score_breakdown.maintainability_score, - "readability": result.score_breakdown.readability_score, - "security": result.score_breakdown.security_score, - } - - def main() -> None: """Render the Streamlit UI.""" - st.set_page_config(page_title="TorchReview Copilot", layout="wide") - st.title("TorchReview Copilot") - st.caption( - "AI-powered Python code review with static analysis, PyTorch scoring, " - "RL-ready rewards, and actionable code-improvement guidance." - ) - - with st.sidebar: - st.subheader("Review Pipeline") - st.markdown( - "\n".join( - [ - "1. Input Python code", - "2. Parse AST + estimate complexity", - "3. Score with a PyTorch encoder", - "4. Generate suggestions and auto-fix hints", - "5. Compute an RL-ready reward", - ] - ) - ) - example_name = st.selectbox("Example input", list(EXAMPLES.keys())) - auto_analyze = st.toggle("Real-time scoring", value=True) - st.info("The PyTorch layer uses CodeBERTa embeddings when weights are available, with a torch-native fallback for offline demos.") + st.set_page_config(page_title="Multi-Domain AI Code Analyzer", layout="wide") + st.title("Multi-Domain AI Code Analyzer & Improvement System") + st.caption("PyTorch-powered code review across DSA, Data Science, ML/DL, and Web backend code.") + example_name = st.selectbox("Example input", list(EXAMPLES.keys())) example = EXAMPLES[example_name] + auto_analyze = st.toggle("Real-time scoring", value=True) left, right = st.columns([1.2, 1.0]) with left: code = st.text_area("Code input", value=example["code"], height=420) context_window = st.text_area("Context window", value=example["context_window"], height=100) - traceback_text = st.text_area("Optional traceback / runtime hint", value=example["traceback_text"], height=100) - domain_hint = st.selectbox("Domain hint", ["auto", "dsa", "data_science", "ml_dl", "web"], index=["auto", "dsa", "data_science", "ml_dl", "web"].index(example["domain_hint"])) - analyze_clicked = st.button("Analyze Code", type="primary") - - result = None - if code and (analyze_clicked or auto_analyze): - result = _analyze(code, context_window, traceback_text, domain_hint) - + traceback_text = st.text_area("Optional traceback / runtime hint", value=example["traceback_text"], height=100) + domain_hint = st.selectbox("Domain hint", ["auto", "dsa", "data_science", "ml_dl", "web"], index=["auto", "dsa", "data_science", "ml_dl", "web"].index(example["domain_hint"])) + analyze_clicked = st.button("Analyze Code", type="primary") + + result = None + if code and (analyze_clicked or auto_analyze): + result = _analyze(code, context_window, traceback_text, domain_hint) + with right: if result is None: st.info("Paste code or load an example to start analysis.") @@ -85,17 +54,9 @@ def main() -> None: metric_cols = st.columns(4) metric_cols[0].metric("Detected domain", result.detected_domain) metric_cols[1].metric("ML score", f"{result.score_breakdown.ml_score:.0%}") - metric_cols[2].metric("Lint score", f"{result.score_breakdown.lint_score:.0%}") + metric_cols[2].metric("Domain score", f"{result.score_breakdown.domain_score:.0%}") metric_cols[3].metric("Reward", f"{result.score_breakdown.reward:.0%}") - st.subheader("Domain Confidence") st.bar_chart(result.domain_confidences) - st.subheader("Review Signal Radar") - st.bar_chart(_score_chart_data(result)) - st.code( - "reward = 0.50*ml_score + 0.18*lint + 0.12*maintainability " - "+ 0.10*domain + 0.10*security - 0.20*complexity", - language="text", - ) st.caption(result.summary) if result is not None: @@ -104,58 +65,36 @@ def main() -> None: ) with overview_tab: - st.subheader("Reward Breakdown") - st.json(result.score_visualization) - st.subheader("Top Signals") - signal_cols = st.columns(3) - signal_cols[0].progress(result.score_breakdown.quality_signal, text="Quality signal") - signal_cols[1].progress(result.score_breakdown.error_reduction_signal, text="Error reduction") - signal_cols[2].progress(result.score_breakdown.completion_signal, text="Completion") st.subheader("Improvement Plan") for step in result.improvement_plan: st.write(f"- {step}") - if result.auto_fix_preview: - st.subheader("Auto-Fix Preview") - for hint in result.auto_fix_preview: - st.write(f"- {hint}") st.subheader("Complexity") st.write( { "time_complexity": result.static_analysis.time_complexity, "space_complexity": result.static_analysis.space_complexity, "cyclomatic_complexity": result.static_analysis.cyclomatic_complexity, - "max_nesting_depth": result.static_analysis.max_nesting_depth, } ) with suggestions_tab: st.subheader("Suggestions") - for suggestion in result.suggestions: - st.write(f"- [{suggestion.priority}] {suggestion.title}: {suggestion.action}") - if result.domain_analysis.suggestions: - st.subheader("Domain Hints") - for item in result.domain_analysis.suggestions: - st.write(f"- {item}") - if result.domain_analysis.issues or result.static_analysis.issues: + for suggestion in result.domain_analysis.suggestions: + st.write(f"- {suggestion}") + if result.domain_analysis.issues: st.subheader("Issues") - for issue in result.domain_analysis.issues + result.static_analysis.issues: + for issue in result.domain_analysis.issues: st.write(f"- [{issue.severity}] {issue.title}: {issue.description}") with domain_tab: st.subheader("Domain Highlights") st.json(result.domain_analysis.highlights) st.write(f"Domain score: {result.domain_analysis.domain_score:.0%}") - st.write(f"Model label: {result.model_prediction.quality_label}") - st.write(f"Model backend: `{result.model_backend}`") - if result.model_prediction.notes: - st.subheader("Model Notes") - for note in result.model_prediction.notes: - st.write(f"- {note}") with static_tab: st.subheader("Static Analysis") st.json(result.static_analysis.model_dump()) - - -if __name__ == "__main__": - main() + + +if __name__ == "__main__": + main() diff --git a/app/utils/__init__.py b/app/utils/__init__.py index d96f8c5f3e2145b34e24ef2c705fc9e5c60f5c7c..90078947c16b4f82a1ff0b83c78ac4b8e9001a28 100644 --- a/app/utils/__init__.py +++ b/app/utils/__init__.py @@ -1,21 +1,21 @@ -"""Utility helpers shared by the inference runtime.""" - -from .runtime import ( - compact_text, - format_bool, - format_error, - format_reward, - observation_attr, - parse_task_ids, - suppress_output, -) - -__all__ = [ - "compact_text", - "format_bool", - "format_error", - "format_reward", - "observation_attr", - "parse_task_ids", - "suppress_output", -] +"""Utility helpers shared by the inference runtime.""" + +from .runtime import ( + compact_text, + format_bool, + format_error, + format_reward, + observation_attr, + parse_task_ids, + suppress_output, +) + +__all__ = [ + "compact_text", + "format_bool", + "format_error", + "format_reward", + "observation_attr", + "parse_task_ids", + "suppress_output", +] diff --git a/app/utils/runtime.py b/app/utils/runtime.py index cd061f1741f2d37aa0901a33f0b6cff8ea36f257..88d4da364e11a518adf6fa8c0c46ed4897de5012 100644 --- a/app/utils/runtime.py +++ b/app/utils/runtime.py @@ -1,106 +1,95 @@ """Formatting, parsing, and IO-suppression helpers for inference.""" - -from __future__ import annotations - -import io -from collections.abc import Iterable -from contextlib import contextmanager, redirect_stderr, redirect_stdout -from typing import Any, Iterator - + +from __future__ import annotations + +import io +from collections.abc import Iterable +from contextlib import contextmanager, redirect_stderr, redirect_stdout +from typing import Any, Iterator + try: from tasks import task_ids except ImportError: # pragma: no cover from python_env.tasks import task_ids # type: ignore[no-redef] -MIN_DISPLAY_REWARD = 0.01 -MAX_DISPLAY_REWARD = 0.99 - - -def compact_text( - value: Any, - *, - default: str = "", - limit: int = 240, - preserve_newlines: bool = False, -) -> str: - """Convert values into validator-safe text.""" - - if value is None: - return default - try: - text = str(value) - except Exception: - return default - if preserve_newlines: - text = text.strip() - else: - text = " ".join(text.split()) - return text[:limit] if text else default - - -def observation_attr(observation: Any, name: str, default: Any = None, *, preserve_newlines: bool = False) -> Any: - """Read an observation attribute without trusting the payload shape.""" - - if isinstance(observation, dict): - value = observation.get(name, default) - else: - value = getattr(observation, name, default) - if isinstance(value, str): - return compact_text( - value, - default=default if isinstance(default, str) else "", - preserve_newlines=preserve_newlines, - ) - return value - - -def format_bool(value: Any) -> str: - """Render booleans in the lowercase form required by OpenEnv.""" +def compact_text( + value: Any, + *, + default: str = "", + limit: int = 240, + preserve_newlines: bool = False, +) -> str: + """Convert values into validator-safe text.""" + + if value is None: + return default + try: + text = str(value) + except Exception: + return default + if preserve_newlines: + text = text.strip() + else: + text = " ".join(text.split()) + return text[:limit] if text else default + + +def observation_attr(observation: Any, name: str, default: Any = None, *, preserve_newlines: bool = False) -> Any: + """Read an observation attribute without trusting the payload shape.""" + + if isinstance(observation, dict): + value = observation.get(name, default) + else: + value = getattr(observation, name, default) + if isinstance(value, str): + return compact_text( + value, + default=default if isinstance(default, str) else "", + preserve_newlines=preserve_newlines, + ) + return value + +def format_bool(value: Any) -> str: return "true" if bool(value) else "false" def format_reward(value: Any) -> str: - """Render rewards in a validator-safe two-decimal open interval.""" - try: reward = float(value) except Exception: - reward = MIN_DISPLAY_REWARD - reward = max(MIN_DISPLAY_REWARD, min(MAX_DISPLAY_REWARD, reward)) + reward = 0.0 return f"{reward:.2f}" def format_error(value: Any) -> str: - """Render nullable error strings in the stdout contract format.""" - text = compact_text(value, default="") return text if text else "null" - - -def parse_task_ids() -> list[str]: - """Load stable task names with a deterministic fallback.""" - - try: - values = task_ids() - if isinstance(values, Iterable): - loaded = [compact_text(item, default="") for item in values] - loaded = [item for item in loaded if item] - if loaded: - return loaded - except Exception: - pass - return [ - "syntax_fix_invoice_totals", - "bug_fix_session_windows", - "optimization_rank_active_users", - ] - - -@contextmanager -def suppress_output() -> Iterator[None]: - """Silence libraries that write noisy logs to stdout or stderr.""" - - with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()): - yield + + +def parse_task_ids() -> list[str]: + """Load stable task names with a deterministic fallback.""" + + try: + values = task_ids() + if isinstance(values, Iterable): + loaded = [compact_text(item, default="") for item in values] + loaded = [item for item in loaded if item] + if loaded: + return loaded + except Exception: + pass + return [ + "syntax_fix_invoice_totals", + "bug_fix_session_windows", + "optimization_rank_active_users", + ] + + +@contextmanager +def suppress_output() -> Iterator[None]: + """Silence libraries that write noisy logs to stdout or stderr.""" + + with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()): + yield diff --git a/client.py b/client.py index 0ef5b4337c3e03a9d4511e6466a52d6ad9b62878..0df35a7f5dfeea5508ab6ada6090b53dc302b486 100644 --- a/client.py +++ b/client.py @@ -2,23 +2,16 @@ from __future__ import annotations -from typing import Dict - -from openenv.core import EnvClient -from openenv.core.client_types import StepResult - -try: - from .models import ( - PythonCodeReviewAction, - PythonCodeReviewObservation, - PythonCodeReviewState, - ) -except ImportError: # pragma: no cover - from models import ( - PythonCodeReviewAction, - PythonCodeReviewObservation, - PythonCodeReviewState, - ) +from typing import Dict + +from openenv.core import EnvClient +from openenv.core.client_types import StepResult + +from .models import ( + PythonCodeReviewAction, + PythonCodeReviewObservation, + PythonCodeReviewState, +) class PythonCodeReviewEnv( diff --git a/graders/bug_fix.py b/graders/bug_fix.py index b8cba44cb589238ead7d7dc20a2f4808d41ebee1..21e2c16691427372b067e0f343af5bcfd5542246 100644 --- a/graders/bug_fix.py +++ b/graders/bug_fix.py @@ -3,127 +3,127 @@ from __future__ import annotations try: - from ..models import TaskGrade + from ..models import TaskGrade from ..tasks.catalog import ReviewTask except ImportError: - from models import TaskGrade + from models import TaskGrade from tasks.catalog import ReviewTask -from .shared import ( - base_grade, - compile_code, - composite_grade_score, - component_score, - execute_cases, - quality_metrics, - similarity_score, - summarize_results, -) +from .shared import ( + base_grade, + compile_code, + composite_grade_score, + component_score, + execute_cases, + quality_metrics, + similarity_score, + summarize_results, +) -def grade_bug_fix_task( +def grade_bug_fix_task( task: ReviewTask, code: str, *, include_hidden: bool, timeout_s: float = 2.0, ) -> TaskGrade: - """Grade a bug-fix task against public or full test suites.""" - - compiled, compile_error = compile_code(code) - quality = quality_metrics(code, task.function_name) - similarity = similarity_score(code, task.reference_code) - details = { - "compile_error": compile_error, - "quality_notes": quality["quality_notes"], - "style_score": quality["style_score"], - "visibility": "full" if include_hidden else "public", + """Grade a bug-fix task against public or full test suites.""" + + compiled, compile_error = compile_code(code) + quality = quality_metrics(code, task.function_name) + similarity = similarity_score(code, task.reference_code) + details = { + "compile_error": compile_error, + "quality_notes": quality["quality_notes"], + "style_score": quality["style_score"], + "visibility": "full" if include_hidden else "public", } - if not compiled: - details["test_results"] = [] - details["test_summary"] = "Code does not compile." - return base_grade( - score=composite_grade_score( - correctness=0.0, - quality=0.05, - runtime=0.05, - syntax=0.0, - similarity=similarity, - baseline=0.04, - penalty=0.05, - ), - syntax_score=component_score(0.01), - tests_passed=0, - tests_total=len(task.public_cases) + (len(task.hidden_cases) if include_hidden else 0), - quality_score=component_score(0.01), - runtime_score=component_score(0.01), + if not compiled: + details["test_results"] = [] + details["test_summary"] = "Code does not compile." + return base_grade( + score=composite_grade_score( + correctness=0.0, + quality=0.05, + runtime=0.05, + syntax=0.0, + similarity=similarity, + baseline=0.04, + penalty=0.05, + ), + syntax_score=component_score(0.01), + tests_passed=0, + tests_total=len(task.public_cases) + (len(task.hidden_cases) if include_hidden else 0), + quality_score=component_score(0.01), + runtime_score=component_score(0.01), timed_out=False, details=details, ) cases = task.public_cases + (task.hidden_cases if include_hidden else []) - result = execute_cases(code, task.function_name, cases, timeout_s=timeout_s) - if result.get("timed_out"): - details["test_results"] = [] - details["test_summary"] = result["error"] - return base_grade( - score=composite_grade_score( - correctness=0.10, - quality=quality["score"], - runtime=0.0, - syntax=0.95, - similarity=similarity, - baseline=0.06, - penalty=0.12, - ), - syntax_score=component_score(0.95), - tests_passed=0, - tests_total=len(cases), - quality_score=quality["score"], - runtime_score=component_score(0.01), + result = execute_cases(code, task.function_name, cases, timeout_s=timeout_s) + if result.get("timed_out"): + details["test_results"] = [] + details["test_summary"] = result["error"] + return base_grade( + score=composite_grade_score( + correctness=0.10, + quality=quality["score"], + runtime=0.0, + syntax=0.95, + similarity=similarity, + baseline=0.06, + penalty=0.12, + ), + syntax_score=component_score(0.95), + tests_passed=0, + tests_total=len(cases), + quality_score=quality["score"], + runtime_score=component_score(0.01), timed_out=True, details=details, ) - if "error" in result: - details["test_results"] = [] - details["test_summary"] = result["error"] - return base_grade( - score=composite_grade_score( - correctness=0.12, - quality=quality["score"], - runtime=0.0, - syntax=0.95, - similarity=similarity, - baseline=0.06, - penalty=0.08, - ), - syntax_score=component_score(0.95), - tests_passed=0, - tests_total=len(cases), - quality_score=quality["score"], - runtime_score=component_score(0.01), + if "error" in result: + details["test_results"] = [] + details["test_summary"] = result["error"] + return base_grade( + score=composite_grade_score( + correctness=0.12, + quality=quality["score"], + runtime=0.0, + syntax=0.95, + similarity=similarity, + baseline=0.06, + penalty=0.08, + ), + syntax_score=component_score(0.95), + tests_passed=0, + tests_total=len(cases), + quality_score=quality["score"], + runtime_score=component_score(0.01), timed_out=False, details=details, ) - data = result["data"] - pass_rate = data["passed"] / max(data["total"], 1) - details["test_results"] = data["results"] - details["test_summary"] = summarize_results("Test results", data["results"]) - return base_grade( - score=composite_grade_score( - correctness=pass_rate, - quality=quality["score"], - runtime=0.05, - syntax=0.95, - similarity=similarity, - baseline=0.08, - ), - syntax_score=component_score(0.95), - tests_passed=data["passed"], - tests_total=data["total"], - quality_score=quality["score"], + data = result["data"] + pass_rate = data["passed"] / max(data["total"], 1) + details["test_results"] = data["results"] + details["test_summary"] = summarize_results("Test results", data["results"]) + return base_grade( + score=composite_grade_score( + correctness=pass_rate, + quality=quality["score"], + runtime=0.05, + syntax=0.95, + similarity=similarity, + baseline=0.08, + ), + syntax_score=component_score(0.95), + tests_passed=data["passed"], + tests_total=data["total"], + quality_score=quality["score"], runtime_score=component_score(0.01), timed_out=False, details=details, diff --git a/graders/dispatch.py b/graders/dispatch.py index 6b4deb21bfafce14bc133439a8c2a61ad9ba3e0e..43a02bef5b903cd94a570d6a5c56b6e301dcf544 100644 --- a/graders/dispatch.py +++ b/graders/dispatch.py @@ -3,10 +3,10 @@ from __future__ import annotations try: - from ..models import TaskGrade + from ..models import TaskGrade from ..tasks.catalog import ReviewTask except ImportError: - from models import TaskGrade + from models import TaskGrade from tasks.catalog import ReviewTask from .bug_fix import grade_bug_fix_task diff --git a/graders/optimization.py b/graders/optimization.py index 59ecae6aba0f376367770a1034af92359a238a11..7d261fb19275ce5ce46fff00e4a5ac542f706560 100644 --- a/graders/optimization.py +++ b/graders/optimization.py @@ -3,23 +3,23 @@ from __future__ import annotations try: - from ..models import TaskGrade + from ..models import TaskGrade from ..tasks.catalog import ReviewTask except ImportError: - from models import TaskGrade + from models import TaskGrade from tasks.catalog import ReviewTask -from .shared import ( - base_grade, - benchmark_candidate, - compile_code, - composite_grade_score, - component_score, - execute_cases, - quality_metrics, - similarity_score, - summarize_results, -) +from .shared import ( + base_grade, + benchmark_candidate, + compile_code, + composite_grade_score, + component_score, + execute_cases, + quality_metrics, + similarity_score, + summarize_results, +) def grade_optimization_task( @@ -29,81 +29,81 @@ def grade_optimization_task( include_hidden: bool, timeout_s: float = 3.0, ) -> TaskGrade: - """Grade an optimization/refactor task with correctness, quality, and runtime.""" - - compiled, compile_error = compile_code(code) - quality = quality_metrics(code, task.function_name) - similarity = similarity_score(code, task.reference_code) - details = { - "compile_error": compile_error, - "quality_notes": quality["quality_notes"], - "style_score": quality["style_score"], - "visibility": "full" if include_hidden else "public", + """Grade an optimization/refactor task with correctness, quality, and runtime.""" + + compiled, compile_error = compile_code(code) + quality = quality_metrics(code, task.function_name) + similarity = similarity_score(code, task.reference_code) + details = { + "compile_error": compile_error, + "quality_notes": quality["quality_notes"], + "style_score": quality["style_score"], + "visibility": "full" if include_hidden else "public", } - if not compiled: - details["test_results"] = [] - details["test_summary"] = "Code does not compile." - return base_grade( - score=composite_grade_score( - correctness=0.0, - quality=0.05, - runtime=0.0, - syntax=0.0, - similarity=similarity, - baseline=0.04, - penalty=0.06, - ), - syntax_score=component_score(0.01), - tests_passed=0, - tests_total=len(task.public_cases) + (len(task.hidden_cases) if include_hidden else 0), - quality_score=component_score(0.01), - runtime_score=component_score(0.01), + if not compiled: + details["test_results"] = [] + details["test_summary"] = "Code does not compile." + return base_grade( + score=composite_grade_score( + correctness=0.0, + quality=0.05, + runtime=0.0, + syntax=0.0, + similarity=similarity, + baseline=0.04, + penalty=0.06, + ), + syntax_score=component_score(0.01), + tests_passed=0, + tests_total=len(task.public_cases) + (len(task.hidden_cases) if include_hidden else 0), + quality_score=component_score(0.01), + runtime_score=component_score(0.01), timed_out=False, details=details, ) cases = task.public_cases + (task.hidden_cases if include_hidden else []) - result = execute_cases(code, task.function_name, cases, timeout_s=timeout_s) - if result.get("timed_out"): - details["test_results"] = [] - details["test_summary"] = result["error"] - return base_grade( - score=composite_grade_score( - correctness=0.08, - quality=quality["score"], - runtime=0.0, - syntax=0.95, - similarity=similarity, - baseline=0.05, - penalty=0.14, - ), - syntax_score=component_score(0.95), - tests_passed=0, - tests_total=len(cases), - quality_score=quality["score"], - runtime_score=component_score(0.01), + result = execute_cases(code, task.function_name, cases, timeout_s=timeout_s) + if result.get("timed_out"): + details["test_results"] = [] + details["test_summary"] = result["error"] + return base_grade( + score=composite_grade_score( + correctness=0.08, + quality=quality["score"], + runtime=0.0, + syntax=0.95, + similarity=similarity, + baseline=0.05, + penalty=0.14, + ), + syntax_score=component_score(0.95), + tests_passed=0, + tests_total=len(cases), + quality_score=quality["score"], + runtime_score=component_score(0.01), timed_out=True, details=details, ) - if "error" in result: - details["test_results"] = [] - details["test_summary"] = result["error"] - return base_grade( - score=composite_grade_score( - correctness=0.10, - quality=quality["score"], - runtime=0.0, - syntax=0.95, - similarity=similarity, - baseline=0.05, - penalty=0.08, - ), - syntax_score=component_score(0.95), - tests_passed=0, - tests_total=len(cases), - quality_score=quality["score"], - runtime_score=component_score(0.01), + if "error" in result: + details["test_results"] = [] + details["test_summary"] = result["error"] + return base_grade( + score=composite_grade_score( + correctness=0.10, + quality=quality["score"], + runtime=0.0, + syntax=0.95, + similarity=similarity, + baseline=0.05, + penalty=0.08, + ), + syntax_score=component_score(0.95), + tests_passed=0, + tests_total=len(cases), + quality_score=quality["score"], + runtime_score=component_score(0.01), timed_out=False, details=details, ) @@ -122,25 +122,25 @@ def grade_optimization_task( if timed_out: runtime_score = component_score(0.01) - details["test_results"] = data["results"] - details["test_summary"] = summarize_results("Test results", data["results"]) - details["benchmark"] = benchmark_summary - - runtime_progress = 0.0 if benchmark_summary == "Benchmark deferred until hidden evaluation." else runtime_score - return base_grade( - score=composite_grade_score( - correctness=pass_rate, - quality=quality["score"], - runtime=runtime_progress if include_hidden else 0.10, - syntax=0.95, - similarity=similarity, - baseline=0.08 if include_hidden else 0.07, - penalty=0.10 if timed_out else 0.0, - ), - syntax_score=component_score(0.95), - tests_passed=data["passed"], - tests_total=data["total"], - quality_score=quality["score"], + details["test_results"] = data["results"] + details["test_summary"] = summarize_results("Test results", data["results"]) + details["benchmark"] = benchmark_summary + + runtime_progress = 0.0 if benchmark_summary == "Benchmark deferred until hidden evaluation." else runtime_score + return base_grade( + score=composite_grade_score( + correctness=pass_rate, + quality=quality["score"], + runtime=runtime_progress if include_hidden else 0.10, + syntax=0.95, + similarity=similarity, + baseline=0.08 if include_hidden else 0.07, + penalty=0.10 if timed_out else 0.0, + ), + syntax_score=component_score(0.95), + tests_passed=data["passed"], + tests_total=data["total"], + quality_score=quality["score"], runtime_score=runtime_score, timed_out=timed_out, details=details, diff --git a/graders/shared.py b/graders/shared.py index 4334ad3a96ed74a1b1a6bde641eb05c89e3a9c05..b90363ba499b3bf7d4b74f44c971d3fddf3f469e 100644 --- a/graders/shared.py +++ b/graders/shared.py @@ -2,28 +2,28 @@ from __future__ import annotations -import ast -import difflib -import math -import multiprocessing as mp -import os -import time -import traceback +import ast +import difflib +import math +import multiprocessing as mp +import os +import time +import traceback from typing import Any, Callable, Dict, List try: - from ..models import TaskGrade + from ..models import TaskGrade from ..tasks.catalog import CallCase, ReviewTask except ImportError: - from models import TaskGrade + from models import TaskGrade from tasks.catalog import CallCase, ReviewTask -STRICT_SCORE_MIN = 0.01 -STRICT_SCORE_MAX = 0.99 -POOR_SCORE = 0.1 -NEAR_PERFECT_SCORE = 0.95 -EPS = 1e-6 +STRICT_SCORE_MIN = 0.01 +STRICT_SCORE_MAX = 0.99 +POOR_SCORE = 0.1 +NEAR_PERFECT_SCORE = 0.95 +EPS = 1e-6 def finite_float(value: Any, fallback: float = STRICT_SCORE_MIN) -> float: @@ -38,54 +38,54 @@ def finite_float(value: Any, fallback: float = STRICT_SCORE_MIN) -> float: return numeric -def clamp(value: float, lower: float = 0.0, upper: float = 1.0) -> float: - """Clamp a floating-point value to a closed interval.""" - - numeric = finite_float(value, fallback=lower) - return max(lower, min(upper, numeric)) - - -def safe_score(score: Any) -> float: - """Clamp any score to the strict OpenEnv-safe open interval (0, 1).""" - - bounded = max(EPS, min(1.0 - EPS, finite_float(score, fallback=EPS))) - assert 0 < bounded < 1, f"Score must be strictly between 0 and 1: {bounded}" - return bounded - - -def normalize_score(x: Any) -> float: - """Sigmoid-normalize a raw score and clamp it safely into (0, 1).""" - - numeric = finite_float(x, fallback=0.0) - bounded = max(-20.0, min(20.0, numeric)) - return safe_score(1.0 / (1.0 + math.exp(-bounded))) - - -def final_score_pipeline(raw_score: Any) -> float: - """Normalize arbitrary raw scoring signals into a strict OpenEnv-safe score.""" - - return normalize_score(raw_score) - - -def strict_score(value: Any, lower: float = STRICT_SCORE_MIN, upper: float = STRICT_SCORE_MAX) -> float: - """Clamp a score to the OpenEnv-safe open interval (0, 1).""" - - score = max(lower, min(upper, finite_float(value, fallback=lower))) - score = safe_score(score) - assert 0 < score < 1, f"Invalid score: {score}" - return score - - -def shaped_score(progress: Any, floor: float = POOR_SCORE, ceiling: float = NEAR_PERFECT_SCORE) -> float: - """Map progress in [0, 1] to a smooth score band within (0, 1).""" - - bounded_progress = clamp(finite_float(progress, fallback=0.0)) - centered_progress = (bounded_progress - 0.5) * 6.0 - smoothed_progress = final_score_pipeline(centered_progress) - score = floor + (ceiling - floor) * smoothed_progress - score = safe_score(score) - assert 0 < score < 1, f"Invalid score: {score}" - return score +def clamp(value: float, lower: float = 0.0, upper: float = 1.0) -> float: + """Clamp a floating-point value to a closed interval.""" + + numeric = finite_float(value, fallback=lower) + return max(lower, min(upper, numeric)) + + +def safe_score(score: Any) -> float: + """Clamp any score to the strict OpenEnv-safe open interval (0, 1).""" + + bounded = max(EPS, min(1.0 - EPS, finite_float(score, fallback=EPS))) + assert 0 < bounded < 1, f"Score must be strictly between 0 and 1: {bounded}" + return bounded + + +def normalize_score(x: Any) -> float: + """Sigmoid-normalize a raw score and clamp it safely into (0, 1).""" + + numeric = finite_float(x, fallback=0.0) + bounded = max(-20.0, min(20.0, numeric)) + return safe_score(1.0 / (1.0 + math.exp(-bounded))) + + +def final_score_pipeline(raw_score: Any) -> float: + """Normalize arbitrary raw scoring signals into a strict OpenEnv-safe score.""" + + return normalize_score(raw_score) + + +def strict_score(value: Any, lower: float = STRICT_SCORE_MIN, upper: float = STRICT_SCORE_MAX) -> float: + """Clamp a score to the OpenEnv-safe open interval (0, 1).""" + + score = max(lower, min(upper, finite_float(value, fallback=lower))) + score = safe_score(score) + assert 0 < score < 1, f"Invalid score: {score}" + return score + + +def shaped_score(progress: Any, floor: float = POOR_SCORE, ceiling: float = NEAR_PERFECT_SCORE) -> float: + """Map progress in [0, 1] to a smooth score band within (0, 1).""" + + bounded_progress = clamp(finite_float(progress, fallback=0.0)) + centered_progress = (bounded_progress - 0.5) * 6.0 + smoothed_progress = final_score_pipeline(centered_progress) + score = floor + (ceiling - floor) * smoothed_progress + score = safe_score(score) + assert 0 < score < 1, f"Invalid score: {score}" + return score def score_from_checks(passed: int, total: int, floor: float = POOR_SCORE, ceiling: float = NEAR_PERFECT_SCORE) -> float: @@ -104,59 +104,59 @@ def safe_ratio(numerator: Any, denominator: Any) -> float: return clamp(numer / denom) -def component_score(value: Any) -> float: - """Normalize component scores such as syntax, quality, and runtime.""" - - bounded_value = clamp(finite_float(value, fallback=0.0)) - return shaped_score(bounded_value, floor=0.02, ceiling=0.98) - - -def composite_progress( - *, - correctness: Any = 0.0, - quality: Any = 0.0, - runtime: Any = 0.0, - syntax: Any = 0.0, - similarity: Any = 0.0, - baseline: float = 0.05, - penalty: Any = 0.0, -) -> float: - """Blend multiple progress signals into a stable scalar progress estimate.""" - - progress = ( - finite_float(baseline, fallback=0.05) - + 0.45 * clamp(correctness) - + 0.20 * clamp(quality) - + 0.15 * clamp(runtime) - + 0.15 * clamp(syntax) - + 0.05 * clamp(similarity) - - 0.20 * clamp(penalty) - ) - return clamp(progress) - - -def composite_grade_score( - *, - correctness: Any = 0.0, - quality: Any = 0.0, - runtime: Any = 0.0, - syntax: Any = 0.0, - similarity: Any = 0.0, - baseline: float = 0.05, - penalty: Any = 0.0, -) -> float: - """Create a smooth task score from multiple bounded signals.""" - - progress = composite_progress( - correctness=correctness, - quality=quality, - runtime=runtime, - syntax=syntax, - similarity=similarity, - baseline=baseline, - penalty=penalty, - ) - return shaped_score(progress) +def component_score(value: Any) -> float: + """Normalize component scores such as syntax, quality, and runtime.""" + + bounded_value = clamp(finite_float(value, fallback=0.0)) + return shaped_score(bounded_value, floor=0.02, ceiling=0.98) + + +def composite_progress( + *, + correctness: Any = 0.0, + quality: Any = 0.0, + runtime: Any = 0.0, + syntax: Any = 0.0, + similarity: Any = 0.0, + baseline: float = 0.05, + penalty: Any = 0.0, +) -> float: + """Blend multiple progress signals into a stable scalar progress estimate.""" + + progress = ( + finite_float(baseline, fallback=0.05) + + 0.45 * clamp(correctness) + + 0.20 * clamp(quality) + + 0.15 * clamp(runtime) + + 0.15 * clamp(syntax) + + 0.05 * clamp(similarity) + - 0.20 * clamp(penalty) + ) + return clamp(progress) + + +def composite_grade_score( + *, + correctness: Any = 0.0, + quality: Any = 0.0, + runtime: Any = 0.0, + syntax: Any = 0.0, + similarity: Any = 0.0, + baseline: float = 0.05, + penalty: Any = 0.0, +) -> float: + """Create a smooth task score from multiple bounded signals.""" + + progress = composite_progress( + correctness=correctness, + quality=quality, + runtime=runtime, + syntax=syntax, + similarity=similarity, + baseline=baseline, + penalty=penalty, + ) + return shaped_score(progress) def compile_code(code: str) -> tuple[bool, str]: @@ -199,26 +199,18 @@ def run_with_timeout( payload: Dict[str, Any], timeout_s: float, ) -> Dict[str, Any]: - """Execute a worker in a subprocess and terminate on timeout. - - Some constrained Windows environments disallow spawned pipes or child - processes. In those cases, fall back to the inline timeout path so local - demos and tests still work deterministically. - """ - - try: - ctx = mp.get_context("spawn") - queue = ctx.Queue() - process = ctx.Process(target=_queue_worker, args=(worker, payload, queue)) - process.start() - process.join(timeout_s) - except (PermissionError, OSError): - return run_inline_with_timeout(worker, payload, timeout_s) - - if process.is_alive(): - process.terminate() - process.join() - return {"timed_out": True, "error": f"Execution exceeded {timeout_s:.1f}s timeout."} + """Execute a worker in a subprocess and terminate on timeout.""" + + ctx = mp.get_context("spawn") + queue = ctx.Queue() + process = ctx.Process(target=_queue_worker, args=(worker, payload, queue)) + process.start() + process.join(timeout_s) + + if process.is_alive(): + process.terminate() + process.join() + return {"timed_out": True, "error": f"Execution exceeded {timeout_s:.1f}s timeout."} if queue.empty(): return {"timed_out": False, "error": "Worker exited without returning a result."} @@ -227,31 +219,31 @@ def run_with_timeout( if not message["ok"]: return { "timed_out": False, - "error": f"{message['error']}\n{message['traceback']}", - } - return {"timed_out": False, "data": message["data"]} - - -def run_inline_with_timeout( - worker: Callable[[Dict[str, Any]], Dict[str, Any]], - payload: Dict[str, Any], - timeout_s: float, -) -> Dict[str, Any]: - """Fallback execution path for platforms where spawned workers are unreliable.""" - - started = time.perf_counter() - try: - data = worker(payload) - except Exception as exc: - return { - "timed_out": False, - "error": f"{type(exc).__name__}: {exc}\n{traceback.format_exc(limit=5)}", - } - - elapsed = time.perf_counter() - started - if elapsed > timeout_s: - return {"timed_out": True, "error": f"Execution exceeded {timeout_s:.1f}s timeout."} - return {"timed_out": False, "data": data} + "error": f"{message['error']}\n{message['traceback']}", + } + return {"timed_out": False, "data": message["data"]} + + +def run_inline_with_timeout( + worker: Callable[[Dict[str, Any]], Dict[str, Any]], + payload: Dict[str, Any], + timeout_s: float, +) -> Dict[str, Any]: + """Fallback execution path for platforms where spawned workers are unreliable.""" + + started = time.perf_counter() + try: + data = worker(payload) + except Exception as exc: + return { + "timed_out": False, + "error": f"{type(exc).__name__}: {exc}\n{traceback.format_exc(limit=5)}", + } + + elapsed = time.perf_counter() - started + if elapsed > timeout_s: + return {"timed_out": True, "error": f"Execution exceeded {timeout_s:.1f}s timeout."} + return {"timed_out": False, "data": data} def _execute_cases_worker(payload: Dict[str, Any]) -> Dict[str, Any]: @@ -456,7 +448,7 @@ def _benchmark_worker(payload: Dict[str, Any]) -> Dict[str, Any]: return {"baseline_seconds": baseline_seconds, "candidate_seconds": candidate_seconds} -def benchmark_candidate(task: ReviewTask, code: str, timeout_s: float) -> Dict[str, Any]: +def benchmark_candidate(task: ReviewTask, code: str, timeout_s: float) -> Dict[str, Any]: """Benchmark a candidate solution against the starter implementation.""" if not task.benchmark_config: @@ -470,10 +462,10 @@ def benchmark_candidate(task: ReviewTask, code: str, timeout_s: float) -> Dict[s "events": events, "iterations": task.benchmark_config.get("iterations", 5), } - if os.name == "nt": - result = run_inline_with_timeout(_benchmark_worker, payload, timeout_s=timeout_s) - else: - result = run_with_timeout(_benchmark_worker, payload, timeout_s=timeout_s) + if os.name == "nt": + result = run_inline_with_timeout(_benchmark_worker, payload, timeout_s=timeout_s) + else: + result = run_with_timeout(_benchmark_worker, payload, timeout_s=timeout_s) if result.get("timed_out"): return {"runtime_score": component_score(STRICT_SCORE_MIN), "timed_out": True, "details": result["error"]} if "error" in result: diff --git a/graders/syntax.py b/graders/syntax.py index 3b31c119d10e74acb5b7645370de085c458b1e13..c11111f192eef824e83ab06021cf578f4fa544fc 100644 --- a/graders/syntax.py +++ b/graders/syntax.py @@ -3,120 +3,120 @@ from __future__ import annotations try: - from ..models import TaskGrade + from ..models import TaskGrade from ..tasks.catalog import ReviewTask except ImportError: - from models import TaskGrade + from models import TaskGrade from tasks.catalog import ReviewTask -from .shared import ( - base_grade, - compile_code, - composite_grade_score, - component_score, - execute_cases, - quality_metrics, - similarity_score, - summarize_results, -) +from .shared import ( + base_grade, + compile_code, + composite_grade_score, + component_score, + execute_cases, + quality_metrics, + similarity_score, + summarize_results, +) -def grade_syntax_task(task: ReviewTask, code: str, timeout_s: float = 2.0) -> TaskGrade: - """Grade a syntax-fix task deterministically.""" +def grade_syntax_task(task: ReviewTask, code: str, timeout_s: float = 2.0) -> TaskGrade: + """Grade a syntax-fix task deterministically.""" + + compiled, compile_error = compile_code(code) + quality = quality_metrics(code, task.function_name) + similarity = similarity_score(code, task.reference_code) + details = { + "compile_error": compile_error, + "quality_notes": quality["quality_notes"], + "style_score": quality["style_score"], + } - compiled, compile_error = compile_code(code) - quality = quality_metrics(code, task.function_name) - similarity = similarity_score(code, task.reference_code) - details = { - "compile_error": compile_error, - "quality_notes": quality["quality_notes"], - "style_score": quality["style_score"], - } - - if not compiled: - details["test_results"] = [] - details["test_summary"] = "Code does not compile yet." - return base_grade( - score=composite_grade_score( - correctness=0.0, - quality=0.05, - runtime=0.05, - syntax=0.0, - similarity=similarity, - baseline=0.05, - penalty=0.05, - ), - syntax_score=component_score(0.01), - tests_passed=0, - tests_total=len(task.public_cases) + len(task.hidden_cases), - quality_score=component_score(0.01), - runtime_score=component_score(0.01), + if not compiled: + details["test_results"] = [] + details["test_summary"] = "Code does not compile yet." + return base_grade( + score=composite_grade_score( + correctness=0.0, + quality=0.05, + runtime=0.05, + syntax=0.0, + similarity=similarity, + baseline=0.05, + penalty=0.05, + ), + syntax_score=component_score(0.01), + tests_passed=0, + tests_total=len(task.public_cases) + len(task.hidden_cases), + quality_score=component_score(0.01), + runtime_score=component_score(0.01), timed_out=False, details=details, ) cases = task.public_cases + task.hidden_cases - result = execute_cases(code, task.function_name, cases, timeout_s=timeout_s) - if result.get("timed_out"): - details["test_results"] = [] - details["test_summary"] = result["error"] - return base_grade( - score=composite_grade_score( - correctness=0.15, - quality=quality["score"], - runtime=0.0, - syntax=0.95, - similarity=similarity, - baseline=0.08, - penalty=0.12, - ), - syntax_score=component_score(0.95), - tests_passed=0, - tests_total=len(cases), - quality_score=quality["score"], - runtime_score=component_score(0.01), + result = execute_cases(code, task.function_name, cases, timeout_s=timeout_s) + if result.get("timed_out"): + details["test_results"] = [] + details["test_summary"] = result["error"] + return base_grade( + score=composite_grade_score( + correctness=0.15, + quality=quality["score"], + runtime=0.0, + syntax=0.95, + similarity=similarity, + baseline=0.08, + penalty=0.12, + ), + syntax_score=component_score(0.95), + tests_passed=0, + tests_total=len(cases), + quality_score=quality["score"], + runtime_score=component_score(0.01), timed_out=True, details=details, ) - if "error" in result: - details["test_results"] = [] - details["test_summary"] = result["error"] - return base_grade( - score=composite_grade_score( - correctness=0.18, - quality=quality["score"], - runtime=0.0, - syntax=0.95, - similarity=similarity, - baseline=0.08, - penalty=0.08, - ), - syntax_score=component_score(0.95), - tests_passed=0, - tests_total=len(cases), - quality_score=quality["score"], - runtime_score=component_score(0.01), + if "error" in result: + details["test_results"] = [] + details["test_summary"] = result["error"] + return base_grade( + score=composite_grade_score( + correctness=0.18, + quality=quality["score"], + runtime=0.0, + syntax=0.95, + similarity=similarity, + baseline=0.08, + penalty=0.08, + ), + syntax_score=component_score(0.95), + tests_passed=0, + tests_total=len(cases), + quality_score=quality["score"], + runtime_score=component_score(0.01), timed_out=False, details=details, ) - data = result["data"] - details["test_results"] = data["results"] - details["test_summary"] = summarize_results("Validation checks", data["results"]) - pass_rate = data["passed"] / max(data["total"], 1) - return base_grade( - score=composite_grade_score( - correctness=pass_rate, - quality=quality["score"], - runtime=0.05, - syntax=0.95, - similarity=similarity, - baseline=0.10, - ), - syntax_score=component_score(0.95), - tests_passed=data["passed"], - tests_total=data["total"], - quality_score=quality["score"], + data = result["data"] + details["test_results"] = data["results"] + details["test_summary"] = summarize_results("Validation checks", data["results"]) + pass_rate = data["passed"] / max(data["total"], 1) + return base_grade( + score=composite_grade_score( + correctness=pass_rate, + quality=quality["score"], + runtime=0.05, + syntax=0.95, + similarity=similarity, + baseline=0.10, + ), + syntax_score=component_score(0.95), + tests_passed=data["passed"], + tests_total=data["total"], + quality_score=quality["score"], runtime_score=component_score(0.01), timed_out=False, details=details, diff --git a/inference.py b/inference.py index 9ede6c47a468c19322eda425403a76ac266b41ea..beada78d444cc14cf9c210a6132b24699430c198 100644 --- a/inference.py +++ b/inference.py @@ -1,12 +1,12 @@ -#!/usr/bin/env python3 -"""Root validator entrypoint.""" - -from __future__ import annotations - -import sys - -from app.env.runner import main - - -if __name__ == "__main__": - sys.exit(main()) +#!/usr/bin/env python3 +"""Root validator entrypoint.""" + +from __future__ import annotations + +import sys + +from app.env.runner import main + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/launch.py b/launch.py index 71d10c43e0b3a6a05a767902d2a022f7662bdeb1..c06c8d1cdf8c2a4a1dabf4cf54ca9534967d7212 100644 --- a/launch.py +++ b/launch.py @@ -1,35 +1,35 @@ -"""Launch the FastAPI backend and Streamlit UI in one Docker container.""" - -from __future__ import annotations - -import subprocess -import sys - - -def main() -> int: - """Start the API backend in the background and keep Streamlit in the foreground.""" - - api_process = subprocess.Popen( - ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8001"], - ) - try: - return subprocess.call( - [ - "streamlit", - "run", - "app/streamlit_app.py", - "--server.port", - "8000", - "--server.address", - "0.0.0.0", - "--server.headless", - "true", - ] - ) - finally: - api_process.terminate() - api_process.wait(timeout=10) - - -if __name__ == "__main__": - sys.exit(main()) +"""Launch the FastAPI backend and Streamlit UI in one Docker container.""" + +from __future__ import annotations + +import subprocess +import sys + + +def main() -> int: + """Start the API backend in the background and keep Streamlit in the foreground.""" + + api_process = subprocess.Popen( + ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8001"], + ) + try: + return subprocess.call( + [ + "streamlit", + "run", + "app/streamlit_app.py", + "--server.port", + "8000", + "--server.address", + "0.0.0.0", + "--server.headless", + "true", + ] + ) + finally: + api_process.terminate() + api_process.wait(timeout=10) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/models.py b/models.py index 5a83f60d050accc83ab05bb2bef8743c52237739..6002de2dd30c2ab339e1acf93c068d0160666960 100644 --- a/models.py +++ b/models.py @@ -1,4 +1,4 @@ -"""Typed models for the python_code_review_env environment.""" +"""Typed models for the python_code_review_env environment.""" from __future__ import annotations @@ -23,22 +23,22 @@ class HistoryEntry(BaseModel): reward: float = Field(..., gt=0.0, lt=1.0, description="Reward returned for the step.") -class RewardDetails(BaseModel): - """Transparent reward decomposition for debugging and training.""" - - value: float = Field(..., gt=0.0, lt=1.0, description="Clamped net reward in (0.0, 1.0).") - syntax_reward: float = Field(default=0.0) - test_reward: float = Field(default=0.0) - correctness_bonus: float = Field(default=0.0) - quality_bonus: float = Field(default=0.0) - error_reduction_bonus: float = Field(default=0.0) - completion_bonus: float = Field(default=0.0) - runtime_bonus: float = Field(default=0.0) - progress_delta: float = Field(default=0.0) - invalid_action_penalty: float = Field(default=0.0) - timeout_penalty: float = Field(default=0.0) - regression_penalty: float = Field(default=0.0) - stagnation_penalty: float = Field(default=0.0) +class RewardDetails(BaseModel): + """Transparent reward decomposition for debugging and training.""" + + value: float = Field(..., gt=0.0, lt=1.0, description="Clamped net reward in (0.0, 1.0).") + syntax_reward: float = Field(default=0.0) + test_reward: float = Field(default=0.0) + correctness_bonus: float = Field(default=0.0) + quality_bonus: float = Field(default=0.0) + error_reduction_bonus: float = Field(default=0.0) + completion_bonus: float = Field(default=0.0) + runtime_bonus: float = Field(default=0.0) + progress_delta: float = Field(default=0.0) + invalid_action_penalty: float = Field(default=0.0) + timeout_penalty: float = Field(default=0.0) + regression_penalty: float = Field(default=0.0) + stagnation_penalty: float = Field(default=0.0) reason: str = Field(..., description="Human-readable reward explanation.") prev_score: float = Field(default=0.01, gt=0.0, lt=1.0) curr_score: float = Field(default=0.01, gt=0.0, lt=1.0) @@ -66,17 +66,17 @@ class PythonCodeReviewObservation(Observation): current_code: str = Field(..., description="Latest code under review.") errors: str = Field(default="", description="Syntax or execution errors.") test_results: str = Field(default="", description="Public test and benchmark feedback.") - visible_tests: List[str] = Field(default_factory=list) - history: List[HistoryEntry] = Field(default_factory=list) - attempts_remaining: int = Field(..., ge=0) - last_action_status: str = Field(default="") - last_action_error: Optional[str] = Field(default=None) - score: float = Field(..., gt=0.0, lt=1.0) - reward: float = Field(default=0.1, gt=0.0, lt=1.0) - done: bool = Field(default=False) - reward_details: RewardDetails = Field( - default_factory=lambda: RewardDetails(value=0.1, reason="Environment reset.") - ) + visible_tests: List[str] = Field(default_factory=list) + history: List[HistoryEntry] = Field(default_factory=list) + attempts_remaining: int = Field(..., ge=0) + last_action_status: str = Field(default="") + last_action_error: Optional[str] = Field(default=None) + score: float = Field(..., gt=0.0, lt=1.0) + reward: float = Field(default=0.1, gt=0.0, lt=1.0) + done: bool = Field(default=False) + reward_details: RewardDetails = Field( + default_factory=lambda: RewardDetails(value=0.1, reason="Environment reset.") + ) class PythonCodeReviewState(State): diff --git a/models/__init__.py b/models/__init__.py index e850debc4c529344baf4fdc31f9f9f5f46b953ed..b2d760c568bd457e584c28c004c66be799de6106 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -1,66 +1,76 @@ -"""PyTorch-backed model wrappers plus OpenEnv schema exports.""" - -from __future__ import annotations - -import importlib.util -import sys -from pathlib import Path - -from .pytorch_model import PyTorchCodeAnalyzerModel - - -def _load_schema_module(): - schema_path = Path(__file__).resolve().parent.parent / "models.py" - spec = importlib.util.spec_from_file_location("_python_env_schema_models", schema_path) - if spec is None or spec.loader is None: # pragma: no cover - raise ImportError(f"Unable to load schema models from {schema_path}") - if spec.name in sys.modules: - return sys.modules[spec.name] - module = importlib.util.module_from_spec(spec) - sys.modules[spec.name] = module - spec.loader.exec_module(module) - for model_name in ( - "HistoryEntry", - "RewardDetails", - "PythonCodeReviewAction", - "PythonCodeReviewObservation", - "PythonCodeReviewState", - "TaskDescriptor", - "TaskSummary", - "TaskGrade", - "HealthResponse", - ): - getattr(module, model_name).model_rebuild() - return module - - -_schema_models = _load_schema_module() - -HealthResponse = _schema_models.HealthResponse -HistoryEntry = _schema_models.HistoryEntry -PythonAction = _schema_models.PythonAction -PythonCodeReviewAction = _schema_models.PythonCodeReviewAction -PythonCodeReviewObservation = _schema_models.PythonCodeReviewObservation -PythonCodeReviewState = _schema_models.PythonCodeReviewState -PythonObservation = _schema_models.PythonObservation -PythonState = _schema_models.PythonState -RewardDetails = _schema_models.RewardDetails -TaskDescriptor = _schema_models.TaskDescriptor -TaskGrade = _schema_models.TaskGrade -TaskSummary = _schema_models.TaskSummary - -__all__ = [ - "HealthResponse", - "HistoryEntry", - "PyTorchCodeAnalyzerModel", - "PythonAction", - "PythonCodeReviewAction", - "PythonCodeReviewObservation", - "PythonCodeReviewState", - "PythonObservation", - "PythonState", - "RewardDetails", - "TaskDescriptor", - "TaskGrade", - "TaskSummary", -] +"""PyTorch-backed model wrappers plus OpenEnv schema exports.""" + +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .pytorch_model import PyTorchCodeAnalyzerModel + + +def _load_schema_module(): + schema_path = Path(__file__).resolve().parent.parent / "models.py" + spec = importlib.util.spec_from_file_location("_python_env_schema_models", schema_path) + if spec is None or spec.loader is None: # pragma: no cover + raise ImportError(f"Unable to load schema models from {schema_path}") + if spec.name in sys.modules: + return sys.modules[spec.name] + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + for model_name in ( + "HistoryEntry", + "RewardDetails", + "PythonCodeReviewAction", + "PythonCodeReviewObservation", + "PythonCodeReviewState", + "TaskDescriptor", + "TaskSummary", + "TaskGrade", + "HealthResponse", + ): + getattr(module, model_name).model_rebuild() + return module + + +_schema_models = _load_schema_module() + +HealthResponse = _schema_models.HealthResponse +HistoryEntry = _schema_models.HistoryEntry +PythonAction = _schema_models.PythonAction +PythonCodeReviewAction = _schema_models.PythonCodeReviewAction +PythonCodeReviewObservation = _schema_models.PythonCodeReviewObservation +PythonCodeReviewState = _schema_models.PythonCodeReviewState +PythonObservation = _schema_models.PythonObservation +PythonState = _schema_models.PythonState +RewardDetails = _schema_models.RewardDetails +TaskDescriptor = _schema_models.TaskDescriptor +TaskGrade = _schema_models.TaskGrade +TaskSummary = _schema_models.TaskSummary + + +def __getattr__(name: str): + if name == "PyTorchCodeAnalyzerModel": + from .pytorch_model import PyTorchCodeAnalyzerModel as model_class + + return model_class + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + +__all__ = [ + "HealthResponse", + "HistoryEntry", + "PyTorchCodeAnalyzerModel", + "PythonAction", + "PythonCodeReviewAction", + "PythonCodeReviewObservation", + "PythonCodeReviewState", + "PythonObservation", + "PythonState", + "RewardDetails", + "TaskDescriptor", + "TaskGrade", + "TaskSummary", +] diff --git a/models/pytorch_model.py b/models/pytorch_model.py index b164b048a8d44d68de06b51136f5499f889d3a04..f3ff2e37177beaea1dc10b9b4a276d171bbfe112 100644 --- a/models/pytorch_model.py +++ b/models/pytorch_model.py @@ -1,4 +1,4 @@ -"""PyTorch + transformers model wrapper for code-quality scoring.""" +"""PyTorch + transformers model wrapper for multi-domain code scoring.""" from __future__ import annotations @@ -17,64 +17,34 @@ except Exception: DOMAIN_PROTOTYPES: Dict[str, List[str]] = { "dsa": [ - "Algorithmic Python with nested loops, recursion, dynamic programming, maps, and asymptotic analysis.", - "Competitive programming utility focused on arrays, graphs, search, and runtime complexity.", + "Binary search, hashmap optimization, recursion, dynamic programming, arrays, trees, graphs, stack, queue, complexity.", + "Competitive programming algorithm with loops, memoization, prefix sums, and asymptotic analysis.", ], "data_science": [ - "Pandas dataframe transformation, numpy vectorization, feature engineering, data cleaning, and leakage prevention.", - "Notebook-style data pipeline using joins, aggregations, and columnar operations.", + "Pandas dataframe transformation, numpy vectorization, feature leakage, train test split, iterrows misuse.", + "Data cleaning pipeline using pandas, numpy, aggregation, joins, and vectorized operations.", ], "ml_dl": [ - "PyTorch model inference or training loop with eval mode, no_grad, tensors, optimizer, and loss functions.", - "Machine learning code with torch, sklearn, batches, checkpoints, and metrics.", + "PyTorch model, training loop, optimizer, backward pass, eval mode, no_grad, loss function, dataloader.", + "Machine learning inference and training code with torch, sklearn, tensors, gradients, and model checkpoints.", ], "web": [ - "FastAPI backend endpoint with pydantic validation, dependency injection, request parsing, and API safety.", - "Python web-service route handling, serialization, authentication, and response contracts.", + "FastAPI endpoint, request validation, Pydantic models, async routes, API security, backend service design.", + "REST API backend with routers, dependency injection, input validation, serialization, and error handling.", ], "general": [ - "General Python utility code with readability, typing, small functions, tests, and maintainable abstractions.", + "General Python utility code with readable structure, typing, tests, and maintainable abstractions.", ], } QUALITY_ANCHORS: Dict[str, List[str]] = { "high": [ - "Production-ready Python code with clear naming, docstrings, validation, efficient loops, and low complexity.", - "Clean code with explicit error handling, typing, modular design, and testable functions.", + "Readable typed Python code with validation, efficient algorithms, vectorized operations, safe inference, and clean API boundaries.", + "Production-ready code with small functions, docstrings, low complexity, and clear error handling.", ], "low": [ - "Bug-prone Python with nested loops, missing validation, weak naming, duplicated logic, and hard-to-review structure.", - "Risky code with syntax drift, unclear behavior, mutable side effects, and repeated scans over data.", - ], -} - -MAINTAINABILITY_ANCHORS: Dict[str, List[str]] = { - "high": [ - "Readable functions, small logical units, strong typing, comments only where needed, and simple control flow.", - "Maintainable Python service with clean architecture, cohesive modules, and explicit contracts.", - ], - "low": [ - "Large unstructured function, missing docstrings, weak names, deeply nested branches, and difficult debugging.", - "Hard-to-maintain script with inconsistent style, brittle branching, and hidden side effects.", - ], -} - -ISSUE_ANCHORS: Dict[str, List[str]] = { - "correctness": [ - "Off-by-one bug, missing final append, incorrect boundary handling, failing assertions, wrong return value.", - "Logic regression caused by a missing branch, incorrect state update, or unhandled edge case.", - ], - "performance": [ - "Repeated full-list scans, brute-force nested loops, iterrows misuse, avoidable O(n^2) behavior, slow pipeline.", - "Performance regression from redundant iteration, poor data structures, or missing vectorization.", - ], - "security": [ - "Unsafe input handling, unchecked request payload, eval usage, missing validation, insecure backend pattern.", - "Security risk caused by trusting raw user input or bypassing schema validation.", - ], - "style": [ - "Readability issues from long lines, missing docstrings, inconsistent spacing, tabs, and trailing whitespace.", - "Style drift that makes code review harder and maintenance slower.", + "Brute-force nested loops, missing validation, unsafe input handling, missing eval mode, missing no_grad, and code smells.", + "Hard to maintain code with high complexity, repeated scans, mutable side effects, and unclear structure.", ], } @@ -148,79 +118,31 @@ class PyTorchCodeAnalyzerModel: self._prototype_cache[bucket] = self._embed_texts(texts) return self._prototype_cache[bucket] - @staticmethod - def _unit_similarity(candidate: torch.Tensor, matrix: torch.Tensor) -> float: - similarity = torch.matmul(candidate, matrix.T).max().item() - return round((similarity + 1.0) / 2.0, 4) - - @staticmethod - def _quality_label(score: float) -> str: - if score >= 0.82: - return "excellent" - if score >= 0.66: - return "good" - if score >= 0.45: - return "needs_work" - return "risky" - - def predict( - self, - code: str, - context_window: str, - traceback_text: str, - static_summary: Dict[str, object], - ) -> Dict[str, object]: - """Predict domain probabilities, quality, and issue risks for Python code.""" + def predict(self, code: str, context_window: str, static_summary: Dict[str, object]) -> Dict[str, object]: + """Predict domain probabilities and a model quality score.""" document = ( f"Code:\n{code.strip()[:4000]}\n\n" f"Context:\n{context_window.strip()[:1000]}\n\n" - f"Traceback:\n{traceback_text.strip()[:1000]}\n\n" f"Static hints:\n{static_summary}\n" ) candidate = self._embed_texts([document]) domain_scores: Dict[str, float] = {} for domain, texts in DOMAIN_PROTOTYPES.items(): - domain_scores[domain] = self._unit_similarity(candidate, self._prototype_matrix(f"domain:{domain}", texts)) + matrix = self._prototype_matrix(f"domain:{domain}", texts) + similarity = torch.matmul(candidate, matrix.T).max().item() + domain_scores[domain] = round((similarity + 1.0) / 2.0, 4) high_matrix = self._prototype_matrix("quality:high", QUALITY_ANCHORS["high"]) low_matrix = self._prototype_matrix("quality:low", QUALITY_ANCHORS["low"]) high_similarity = torch.matmul(candidate, high_matrix.T).max().item() low_similarity = torch.matmul(candidate, low_matrix.T).max().item() - ml_quality_score = round(float(torch.sigmoid(torch.tensor((high_similarity - low_similarity) * 4.0)).item()), 4) - - high_maintainability = torch.matmul( - candidate, - self._prototype_matrix("maintainability:high", MAINTAINABILITY_ANCHORS["high"]).T, - ).max().item() - low_maintainability = torch.matmul( - candidate, - self._prototype_matrix("maintainability:low", MAINTAINABILITY_ANCHORS["low"]).T, - ).max().item() - maintainability_score = round( - float(torch.sigmoid(torch.tensor((high_maintainability - low_maintainability) * 4.0)).item()), - 4, - ) - - issue_logits = [] - issue_labels = list(ISSUE_ANCHORS.keys()) - for label in issue_labels: - similarity = torch.matmul(candidate, self._prototype_matrix(f"issue:{label}", ISSUE_ANCHORS[label]).T).max().item() - issue_logits.append(similarity) - probabilities = torch.softmax(torch.tensor(issue_logits) * 3.0, dim=0) - issue_probabilities = { - label: round(float(probabilities[index].item()), 4) - for index, label in enumerate(issue_labels) - } + ml_quality_score = torch.sigmoid(torch.tensor((high_similarity - low_similarity) * 4.0)).item() return { "domain_scores": domain_scores, - "ml_quality_score": ml_quality_score, - "quality_score": ml_quality_score, - "quality_label": self._quality_label(ml_quality_score), - "maintainability_score": maintainability_score, - "issue_probabilities": issue_probabilities, + "ml_quality_score": round(float(ml_quality_score), 4), "backend_name": self.backend_name, "model_id": self.model_id, "notes": list(self.notes), diff --git a/openenv_python_code_review_env.egg-info/PKG-INFO b/openenv_python_code_review_env.egg-info/PKG-INFO index 72e36f3f27460840ae1d0602ab79dce6c9fd0972..f1b58d1ba337e4b13c86bbade15f1de13f3e4cd2 100644 --- a/openenv_python_code_review_env.egg-info/PKG-INFO +++ b/openenv_python_code_review_env.egg-info/PKG-INFO @@ -16,16 +16,6 @@ Provides-Extra: dev Requires-Dist: pytest>=8.0.0; extra == "dev" Requires-Dist: pytest-cov>=4.0.0; extra == "dev" ---- -title: Python Code Review Environment Server -sdk: docker -app_port: 8000 -base_path: /web -pinned: false -tags: - - openenv ---- - # OpenEnv Python Code Review Environment Production-ready hackathon submission for OpenEnv evaluation, deterministic validator runs, and Hugging Face Docker deployment. @@ -34,26 +24,25 @@ Production-ready hackathon submission for OpenEnv evaluation, deterministic vali ```text root -|- inference.py # Root validator entrypoint -|- openenv.yaml # OpenEnv manifest -|- app/ -| |- agents/ # Action policy and fallback strategy -| |- env/ # RL loop runner and stdout contract -| |- models/ # Inference dataclasses/config -| |- services/ # OpenAI client wrapper with retries -| `- utils/ # Formatting, task loading, log suppression -|- server/ -| |- env.py # OpenEnv environment and reward shaping -| |- app.py # FastAPI/OpenEnv app, optional Gradio mount -| `- Dockerfile # Alternate Docker build path -|- Dockerfile # Root deployment Docker image -|- graders/ # Syntax, bug-fix, optimization graders -|- tasks/ # Deterministic benchmark tasks and references -|- services/ # Multi-domain analysis services -|- analyzers/ # Domain-specific analyzers -|- models/ # Lazy-loaded PyTorch scoring model -|- schemas/ # API request/response contracts -`- tests/ # Local validation coverage +├── inference.py # Root validator entrypoint +├── openenv.yaml # OpenEnv manifest +├── app/ +│ ├── agents/ # Action policy and fallback strategy +│ ├── env/ # RL loop runner and stdout contract +│ ├── models/ # Inference dataclasses/config +│ ├── services/ # OpenAI client wrapper with retries +│ └── utils/ # Formatting, task loading, log suppression +├── server/ +│ ├── env.py # OpenEnv environment and reward shaping +│ ├── app.py # FastAPI/OpenEnv app, optional Gradio mount +│ └── Dockerfile # Hugging Face Docker image +├── graders/ # Syntax, bug-fix, optimization graders +├── tasks/ # Deterministic benchmark tasks and references +├── services/ # Multi-domain analysis services +├── analyzers/ # Domain-specific analyzers +├── models/ # Lazy-loaded PyTorch scoring model +├── schemas/ # API request/response contracts +└── tests/ # Local validation coverage ``` Runtime flow: @@ -71,8 +60,8 @@ inference.py - `inference.py` now lives at the repo root and delegates to a strict runner under `app/env`. - OpenAI usage is limited to the official Python client: - `client = OpenAI(base_url=API_BASE_URL, api_key=provider_token)`. -- Defaulted env vars are enforced for `API_BASE_URL` and `MODEL_NAME`; the runtime now selects `HF_TOKEN` for the Hugging Face router and `OPENAI_API_KEY` for direct OpenAI usage. + `client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)`. +- Defaulted env vars are enforced for `API_BASE_URL` and `MODEL_NAME`; `HF_TOKEN` is read without a default and handled explicitly. - Output now matches the required single-line contract exactly and always emits `[END]`, including failure paths. - The RL loop now uses `reset()` plus `step_result()` in a proper `while not done` loop. - Step errors now surface through `last_action_error` and are printed in `[STEP]`. @@ -107,7 +96,6 @@ Optional demo UI: ```bash set ENABLE_GRADIO_DEMO=true -set ENABLE_WEB_INTERFACE=true python -m uvicorn server.app:app --host 0.0.0.0 --port 8000 ``` @@ -120,9 +108,7 @@ Required environment variables: - `MODEL_NAME` Default: `Qwen/Qwen2.5-3B-Instruct` - `HF_TOKEN` - Required for `https://router.huggingface.co/v1` -- `OPENAI_API_KEY` - Required for `https://api.openai.com/v1` + Mandatory, no default is injected Example: @@ -133,13 +119,6 @@ set HF_TOKEN=hf_xxx python inference.py ``` -```bash -set API_BASE_URL=https://api.openai.com/v1 -set MODEL_NAME=gpt-4.1-mini -set OPENAI_API_KEY=sk-xxx -python inference.py -``` - Expected stdout shape: ```text @@ -156,7 +135,7 @@ Expected stdout shape: Build from the project root: ```bash -docker build -t openenv-python-code-review-env . +docker build -f server/Dockerfile . ``` Run locally: @@ -182,12 +161,11 @@ Recommended deployment steps: 1. Create a Docker Space. 2. Push this repository as-is. -3. Let Spaces build from the root `Dockerfile`. +3. Let Spaces build with `server/Dockerfile`. 4. Set Space secrets: `HF_TOKEN` 5. Set Space variables as needed: `API_BASE_URL`, `MODEL_NAME`, `ENABLE_GRADIO_DEMO=false` - `ENABLE_WEB_INTERFACE=false` is also supported for OpenEnv-managed deploys. 6. Confirm the app listens on port `8000`. 7. Smoke-test: `/health` diff --git a/openenv_python_code_review_env.egg-info/SOURCES.txt b/openenv_python_code_review_env.egg-info/SOURCES.txt index 941269c4a795f126e60518385ccca67f6d39299b..69092eb01996368c01e47ffc78f3e2751286ed8e 100644 --- a/openenv_python_code_review_env.egg-info/SOURCES.txt +++ b/openenv_python_code_review_env.egg-info/SOURCES.txt @@ -5,8 +5,7 @@ pyproject.toml ./compat.py ./inference.py ./launch.py -./models.py -./sitecustomize.py +./openenv_models.py ./triage.py ./triage_catalog.py ./triage_models.py diff --git a/pyproject.toml b/pyproject.toml index 7702215f4c915195d2e305948e0d45ea0113b704..ce80811fee9a7e89de02c22a43050f8337018034 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,9 +8,11 @@ version = "1.0.0" description = "TorchReview Copilot: AI-powered Python code triage with PyTorch and OpenEnv validation." readme = "README.md" requires-python = ">=3.10" + dependencies = [ "fastapi>=0.111.0", "gradio>=5.26.0", + "hf-xet>=1.4.3", "openai>=1.76.0", "openenv-core[core]>=0.2.2", "streamlit>=1.44.0", @@ -33,22 +35,7 @@ pythonpath = ["."] [tool.setuptools] include-package-data = true -packages = [ - "python_env", - "python_env.server", - "python_env.tasks", - "python_env.graders", - "python_env.api", - "python_env.app", - "python_env.app.agents", - "python_env.app.env", - "python_env.app.models", - "python_env.app.services", - "python_env.app.utils", - "python_env.analyzers", - "python_env.models", - "python_env.schemas", - "python_env.services", - "python_env.utils", -] -package-dir = { "python_env" = ".", "python_env.server" = "server", "python_env.tasks" = "tasks", "python_env.graders" = "graders", "python_env.api" = "api", "python_env.app" = "app", "python_env.app.agents" = "app/agents", "python_env.app.env" = "app/env", "python_env.app.models" = "app/models", "python_env.app.services" = "app/services", "python_env.app.utils" = "app/utils", "python_env.analyzers" = "analyzers", "python_env.models" = "models", "python_env.schemas" = "schemas", "python_env.services" = "services", "python_env.utils" = "utils" } + +[tool.setuptools.packages.find] +where = ["."] +include = ["*"] diff --git a/schemas/__init__.py b/schemas/__init__.py index d2008615adec6aada77a9ada69cf8d3d8d5fb4c1..e635325f1c40ea4e2797578f1fc3224f9548d1df 100644 --- a/schemas/__init__.py +++ b/schemas/__init__.py @@ -1,13 +1,13 @@ -"""Public schemas for the multi-domain analysis platform.""" - -from .request import AnalyzeCodeRequest -from .response import AnalyzeCodeResponse, AnalysisIssue, DomainAnalysis, ScoreBreakdown, StaticAnalysisSummary - -__all__ = [ - "AnalyzeCodeRequest", - "AnalyzeCodeResponse", - "AnalysisIssue", - "DomainAnalysis", - "ScoreBreakdown", - "StaticAnalysisSummary", -] +"""Public schemas for the multi-domain analysis platform.""" + +from .request import AnalyzeCodeRequest +from .response import AnalyzeCodeResponse, AnalysisIssue, DomainAnalysis, ScoreBreakdown, StaticAnalysisSummary + +__all__ = [ + "AnalyzeCodeRequest", + "AnalyzeCodeResponse", + "AnalysisIssue", + "DomainAnalysis", + "ScoreBreakdown", + "StaticAnalysisSummary", +] diff --git a/schemas/request.py b/schemas/request.py index 63f5e75e069a606e916fb5c84c8a1a4137ffa191..c53252a73269901cb3bf98e8a10b2b5d2140ca66 100644 --- a/schemas/request.py +++ b/schemas/request.py @@ -1,51 +1,19 @@ -"""Request schemas for the AI-powered code review workflow.""" +"""Request schemas for code analysis endpoints and UI.""" from __future__ import annotations from typing import Literal -from pydantic import BaseModel, ConfigDict, Field, field_validator +from pydantic import BaseModel, Field -DomainHint = Literal["auto", "general", "dsa", "data_science", "ml_dl", "web"] +DomainHint = Literal["auto", "dsa", "data_science", "ml_dl", "web"] class AnalyzeCodeRequest(BaseModel): - """Validated input payload for Python code review requests.""" - - model_config = ConfigDict(str_strip_whitespace=True) - - code: str = Field(..., min_length=1, description="Python source code to analyze.") - context_window: str = Field( - default="", - max_length=4000, - description="Optional repository, pull request, or runtime context.", - ) - traceback_text: str = Field( - default="", - max_length=4000, - description="Optional traceback or failing test output.", - ) - domain_hint: DomainHint = Field( - default="auto", - description="Optional analysis lens for domain-aware suggestions.", - ) - filename: str = Field(default="snippet.py", max_length=255, description="Virtual filename for display.") - enable_suggestions: bool = Field( - default=True, - description="Whether the service should return a prioritized improvement plan.", - ) - - @field_validator("code") - @classmethod - def _reject_empty_code(cls, value: str) -> str: - stripped = value.strip() - if not stripped: - raise ValueError("code must not be empty") - return stripped - - @field_validator("filename") - @classmethod - def _normalize_filename(cls, value: str) -> str: - candidate = value.strip() or "snippet.py" - return candidate[:255] + """Validated input payload for multi-domain code analysis.""" + + code: str = Field(..., min_length=1, description="Source code to analyze.") + context_window: str = Field(default="", max_length=2000, description="Optional repository or task context.") + traceback_text: str = Field(default="", max_length=2000, description="Optional runtime or test failure output.") + domain_hint: DomainHint = Field(default="auto", description="Optional domain override when auto detection is not desired.") diff --git a/schemas/response.py b/schemas/response.py index d673a29e8ba63ab9d9d88c173dd734d392c388b1..568543fa94d66642b2cf7c16c7f8e848709313df 100644 --- a/schemas/response.py +++ b/schemas/response.py @@ -1,4 +1,4 @@ -"""Response schemas for the AI-powered code review platform.""" +"""Response schemas for the multi-domain analysis platform.""" from __future__ import annotations @@ -7,103 +7,67 @@ from typing import Dict, List, Literal from pydantic import BaseModel, Field +DomainType = Literal["dsa", "data_science", "ml_dl", "web", "general"] Severity = Literal["low", "medium", "high"] -IssueCategory = Literal["correctness", "maintainability", "performance", "security", "style"] -QualityLabel = Literal["excellent", "good", "needs_work", "risky"] -DetectedDomain = Literal["general", "dsa", "data_science", "ml_dl", "web"] class AnalysisIssue(BaseModel): """One detected issue or risk in the code snippet.""" title: str - category: IssueCategory = "maintainability" severity: Severity description: str line_hint: int | None = None class StaticAnalysisSummary(BaseModel): - """Python-specific static-analysis signals.""" + """Language-agnostic static-analysis signals.""" syntax_valid: bool syntax_error: str = "" cyclomatic_complexity: int = Field(..., ge=1) line_count: int = Field(..., ge=0) - max_nesting_depth: int = Field(..., ge=0) max_loop_depth: int = Field(..., ge=0) time_complexity: str = "Unknown" space_complexity: str = "Unknown" - lint_score: float = Field(..., ge=0.0, le=1.0) - docstring_coverage: float = Field(..., ge=0.0, le=1.0) detected_imports: List[str] = Field(default_factory=list) code_smells: List[str] = Field(default_factory=list) - issues: List[AnalysisIssue] = Field(default_factory=list) class DomainAnalysis(BaseModel): - """Domain-aware review signals used for context-specific suggestions.""" + """Domain-specific analysis payload returned by an analyzer.""" - domain: DetectedDomain + domain: DomainType domain_score: float = Field(..., ge=0.0, le=1.0) issues: List[AnalysisIssue] = Field(default_factory=list) suggestions: List[str] = Field(default_factory=list) highlights: Dict[str, float | str] = Field(default_factory=dict) -class ModelPrediction(BaseModel): - """PyTorch model output derived from pretrained code embeddings.""" - - quality_label: QualityLabel - quality_score: float = Field(..., ge=0.0, le=1.0) - maintainability_score: float = Field(..., ge=0.0, le=1.0) - issue_probabilities: Dict[str, float] = Field(default_factory=dict) - notes: List[str] = Field(default_factory=list) - - class ScoreBreakdown(BaseModel): - """Reward inputs and the final RL-ready scalar reward.""" + """Reward inputs and final normalized score.""" ml_score: float = Field(..., ge=0.0, le=1.0) domain_score: float = Field(..., ge=0.0, le=1.0) lint_score: float = Field(..., ge=0.0, le=1.0) complexity_penalty: float = Field(..., ge=0.0, le=1.0) - maintainability_score: float = Field(..., ge=0.0, le=1.0) - security_score: float = Field(..., ge=0.0, le=1.0) - readability_score: float = Field(..., ge=0.0, le=1.0) quality_signal: float = Field(..., ge=0.0, le=1.0) error_reduction_signal: float = Field(..., ge=0.0, le=1.0) completion_signal: float = Field(..., ge=0.0, le=1.0) reward: float = Field(..., ge=0.0, le=1.0) -class SuggestionItem(BaseModel): - """One prioritized improvement suggestion.""" - - priority: Literal["P0", "P1", "P2"] - title: str - rationale: str - action: str - category: IssueCategory - - class AnalyzeCodeResponse(BaseModel): """Top-level structured output for API and UI consumers.""" - language: Literal["python"] = "python" - detected_domain: DetectedDomain - domain_confidences: Dict[str, float] = Field(default_factory=dict) + detected_domain: DomainType + domain_confidences: Dict[str, float] score_breakdown: ScoreBreakdown static_analysis: StaticAnalysisSummary - model_prediction: ModelPrediction domain_analysis: DomainAnalysis - suggestions: List[SuggestionItem] = Field(default_factory=list) improvement_plan: List[str] = Field(default_factory=list) - auto_fix_preview: List[str] = Field(default_factory=list) - score_visualization: Dict[str, float] = Field(default_factory=dict) model_backend: str model_id: str summary: str context_window: str = "" - filename: str = "snippet.py" analysis_time_ms: float = Field(..., ge=0.0) diff --git a/server/app.py b/server/app.py index 1c4287e8a2d5124c8a17bc9163e029768d7af990..ca80dee39f8dd32319aaf01c55449049b96b0c12 100644 --- a/server/app.py +++ b/server/app.py @@ -53,10 +53,16 @@ def build_application(): served_app = api_app if gr is not None and _gradio_enabled(): try: - from .demo import build_demo + from .demo import CSS, build_demo except ImportError: - from server.demo import build_demo - served_app = gr.mount_gradio_app(api_app, build_demo(), path="/") + from server.demo import CSS, build_demo + served_app = gr.mount_gradio_app( + api_app, + build_demo(), + path="/", + theme=gr.themes.Soft(primary_hue="orange", secondary_hue="amber"), + css=CSS, + ) wrapper_app = FastAPI(title="python_code_review_env", version="1.0.0") @@ -74,7 +80,7 @@ app = build_application() def main(host: str = "0.0.0.0", port: int = 8000) -> None: import uvicorn - uvicorn.run(app, host=host, port=port) + uvicorn.run(app, host=host, port=port, access_log=False) if __name__ == "__main__": diff --git a/server/demo.py b/server/demo.py index 674e040abe7c3d280b971a6ce3224da59b40cc41..3d3ac716faafd7ba663562b1f7bbcdbd077a2cda 100644 --- a/server/demo.py +++ b/server/demo.py @@ -347,7 +347,7 @@ def build_demo() -> gr.Blocks: examples = get_default_engine().example_map() first_example = next(iter(examples.values())) - with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange", secondary_hue="amber"), css=CSS, title="TorchReview Copilot") as demo: + with gr.Blocks(title="TorchReview Copilot") as demo: gr.HTML( """