Spaces:
Sleeping
Sleeping
| """ | |
| Task 2 (Medium): Security Vulnerability Review in a Flask Web Endpoint. | |
| The agent reviews a Flask user-authentication endpoint containing: | |
| 1. SQL injection vulnerability (string formatting into query) | |
| 2. Plaintext password storage (no hashing) | |
| 3. Missing rate limiting / brute-force protection | |
| 4. Sensitive data leakage in error response | |
| 5. Hardcoded secret key | |
| """ | |
| from __future__ import annotations | |
| from typing import Any, Dict | |
| TASK_ID = "task_2_medium_security" | |
| MAX_STEPS = 12 | |
| BUGGY_CODE = '''\ | |
| import sqlite3 | |
| from flask import Flask, request, jsonify | |
| app = Flask(__name__) | |
| app.secret_key = "supersecret123" # VULN 5: hardcoded secret key | |
| DB_PATH = "users.db" | |
| def get_db(): | |
| return sqlite3.connect(DB_PATH) | |
| @app.route("/login", methods=["POST"]) | |
| def login(): | |
| username = request.json.get("username") | |
| password = request.json.get("password") | |
| db = get_db() | |
| cursor = db.cursor() | |
| # VULN 1: SQL injection — user input directly interpolated into query | |
| query = f"SELECT * FROM users WHERE username = \'{username}\' AND password = \'{password}\'" | |
| cursor.execute(query) | |
| user = cursor.fetchone() | |
| if user: | |
| return jsonify({"status": "ok", "user_id": user[0], "email": user[2]}) | |
| else: | |
| # VULN 4: leaks whether username exists or password is wrong | |
| cursor.execute(f"SELECT id FROM users WHERE username = \'{username}\'") | |
| exists = cursor.fetchone() | |
| if exists: | |
| return jsonify({"error": f"Wrong password for user {username}"}), 401 | |
| return jsonify({"error": f"User {username} does not exist"}), 404 | |
| @app.route("/register", methods=["POST"]) | |
| def register(): | |
| username = request.json.get("username") | |
| password = request.json.get("password") # VULN 2: stored in plaintext | |
| email = request.json.get("email") | |
| db = get_db() | |
| cursor = db.cursor() | |
| # VULN 1 again: SQL injection in insert | |
| cursor.execute( | |
| f"INSERT INTO users (username, password, email) VALUES (\'{username}\', \'{password}\', \'{email}\')" | |
| ) | |
| db.commit() | |
| return jsonify({"status": "registered"}) | |
| # VULN 3: No rate limiting on login endpoint (brute-force possible) | |
| if __name__ == "__main__": | |
| app.run(debug=True) | |
| ''' | |
| FIXED_CODE = '''\ | |
| import os | |
| import sqlite3 | |
| from flask import Flask, request, jsonify | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| from flask_limiter import Limiter | |
| from flask_limiter.util import get_remote_address | |
| app = Flask(__name__) | |
| app.secret_key = os.environ.get("SECRET_KEY") # read from env, never hardcode | |
| limiter = Limiter(get_remote_address, app=app, default_limits=["200 per day", "50 per hour"]) | |
| DB_PATH = "users.db" | |
| def get_db(): | |
| return sqlite3.connect(DB_PATH) | |
| @app.route("/login", methods=["POST"]) | |
| @limiter.limit("5 per minute") # brute-force protection | |
| def login(): | |
| username = request.json.get("username") | |
| password = request.json.get("password") | |
| db = get_db() | |
| cursor = db.cursor() | |
| # Parameterised query — prevents SQL injection | |
| cursor.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,)) | |
| user = cursor.fetchone() | |
| if user and check_password_hash(user[1], password): | |
| return jsonify({"status": "ok", "user_id": user[0]}) | |
| # Generic error — does not reveal whether user exists | |
| return jsonify({"error": "Invalid credentials"}), 401 | |
| @app.route("/register", methods=["POST"]) | |
| def register(): | |
| username = request.json.get("username") | |
| password = request.json.get("password") | |
| email = request.json.get("email") | |
| db = get_db() | |
| cursor = db.cursor() | |
| password_hash = generate_password_hash(password) | |
| cursor.execute( | |
| "INSERT INTO users (username, password_hash, email) VALUES (?, ?, ?)", | |
| (username, password_hash, email), | |
| ) | |
| db.commit() | |
| return jsonify({"status": "registered"}) | |
| if __name__ == "__main__": | |
| app.run(debug=False) | |
| ''' | |
| KNOWN_VULNERABILITIES = { | |
| "sql_injection_login": { | |
| "line": 23, | |
| "description_keywords": ["sql injection", "parameterized", "f-string", "format", "interpolat", "query"], | |
| "severity": "critical", | |
| "issue_type": "security", | |
| }, | |
| "sql_injection_register": { | |
| "line": 44, | |
| "description_keywords": ["sql injection", "parameterized", "f-string", "format", "interpolat", "insert"], | |
| "severity": "critical", | |
| "issue_type": "security", | |
| }, | |
| "plaintext_password": { | |
| "line": 39, | |
| "description_keywords": ["plaintext", "hash", "bcrypt", "werkzeug", "password", "store"], | |
| "severity": "critical", | |
| "issue_type": "security", | |
| }, | |
| "no_rate_limiting": { | |
| "line": None, | |
| "description_keywords": ["rate limit", "brute force", "throttl", "limiter"], | |
| "severity": "major", | |
| "issue_type": "security", | |
| }, | |
| "sensitive_data_leak": { | |
| "line": 30, | |
| "description_keywords": ["leak", "enumerat", "username exist", "generic error", "information disclos"], | |
| "severity": "major", | |
| "issue_type": "security", | |
| }, | |
| "hardcoded_secret": { | |
| "line": 5, | |
| "description_keywords": ["hardcode", "secret", "env", "environment variable", "secret_key"], | |
| "severity": "major", | |
| "issue_type": "security", | |
| }, | |
| } | |
| PULL_REQUEST = { | |
| "pull_request_title": "Implement user login and registration API endpoints", | |
| "author": "backend-dev", | |
| "description": ( | |
| "Adds /login and /register REST endpoints backed by SQLite. " | |
| "Ready for production review." | |
| ), | |
| "files_changed": [ | |
| { | |
| "filename": "auth.py", | |
| "language": "python", | |
| "content": BUGGY_CODE, | |
| "line_count": BUGGY_CODE.count("\n") + 1, | |
| } | |
| ], | |
| "test_results": "Manual testing passed on happy path.", | |
| "linter_output": "No linter warnings.", | |
| } | |
| def get_task_config() -> Dict[str, Any]: | |
| return { | |
| "task_id": TASK_ID, | |
| "max_steps": MAX_STEPS, | |
| "pull_request": PULL_REQUEST, | |
| "known_vulnerabilities": KNOWN_VULNERABILITIES, | |
| "fixed_code": FIXED_CODE, | |
| "difficulty": "medium", | |
| "description": ( | |
| "Review a Flask authentication endpoint for security vulnerabilities. " | |
| "Identify all issues by category and severity, then provide a secure patched version." | |
| ), | |
| } | |