Cyber_analyst-round1 / validators.py
Humanlearning's picture
feat: implement core RL training infrastructure and architecture documentation
f3080d1
raw
history blame
8.61 kB
"""Deterministic validators and hidden checks for CyberSecurity_OWASP."""
from __future__ import annotations
from pathlib import Path
from typing import Any
try:
from .models import CyberSecurityOWASPAction, CyberSecurityOWASPState
except ImportError: # pragma: no cover
from models import CyberSecurityOWASPAction, CyberSecurityOWASPState
BLOCKED_PATH_MARKERS = (
"hidden",
"oracle",
"reward_engine",
"validators.py",
"rewards.py",
".git",
"..",
)
def _workspace(state: CyberSecurityOWASPState) -> Path:
return Path(str(state.hidden_facts["workspace"]))
def _route_source(state: CyberSecurityOWASPState) -> str:
return (_workspace(state) / "app/routes/invoices.py").read_text(encoding="utf-8")
def is_path_allowed(state: CyberSecurityOWASPState, path: str, *, write: bool = False) -> tuple[bool, str]:
normalized = path.replace("\\", "/").lstrip("/")
if any(marker in normalized for marker in BLOCKED_PATH_MARKERS):
return False, "blocked path"
if Path(path).is_absolute():
return False, "absolute host paths are blocked"
editable = set(state.hidden_facts.get("editable_files", []))
if normalized not in editable:
return False, "path is outside editable generated workspace"
if write and normalized.startswith("tests/"):
return False, "visible tests are readable but not patchable"
return True, normalized
def detect_cheating(state: CyberSecurityOWASPState, action: CyberSecurityOWASPAction) -> list[str]:
flags: list[str] = []
args = action.arguments or {}
text = " ".join(str(v) for v in args.values())
for key in (
"owner_user_id",
"intruder_user_id",
"admin_user_id",
"owner_invoice_id",
"other_invoice_id",
"foreign_invoice_id",
"tenant_a",
"tenant_b",
):
value = str(state.hidden_facts.get(key, ""))
if value and value in text and action.tool_name == "patch_file":
flags.append("hardcoded_hidden_identifier")
break
if action.tool_name in {"read_file", "search_code", "patch_file"}:
path_text = str(args.get("path", "")) + " " + str(args.get("query", ""))
if any(marker in path_text.replace("\\", "/") for marker in BLOCKED_PATH_MARKERS):
flags.append("blocked_file_probe")
if action.tool_name == "send_local_request":
target = str(args.get("path", ""))
if target.startswith("http://") or target.startswith("https://"):
flags.append("external_network_attempt")
return sorted(set(flags))
def _source_is_secure(source: str) -> bool:
has_tenant_check = "tenant_id" in source and "actor" in source and "!=" in source
has_owner = "owner_user_id" in source and "actor" in source
has_admin = "is_billing_admin" in source
has_forbidden = '"status": 403' in source or "'status': 403" in source
return has_tenant_check and has_owner and has_admin and has_forbidden
def _source_is_deny_all(source: str) -> bool:
early_unconditional = (
" return {\"status\": 403" in source
and " return {\"status\": 403" in source.split("if invoice[", 1)[0]
)
missing_positive_policy = "owner_user_id" not in source or "is_billing_admin" not in source
return early_unconditional or (
("return {\"status\": 403" in source or "return {'status': 403" in source)
and missing_positive_policy
)
def _source_has_hardcoding(state: CyberSecurityOWASPState, source: str) -> bool:
values = [
str(state.hidden_facts.get(key, ""))
for key in (
"owner_user_id",
"intruder_user_id",
"admin_user_id",
"owner_invoice_id",
"other_invoice_id",
"foreign_invoice_id",
"tenant_a",
"tenant_b",
)
]
return any(value and value in source for value in values)
def simulate_request(
state: CyberSecurityOWASPState, method: str, path: str, user_id: str | None = None
) -> dict[str, Any]:
if method.upper() != "GET":
return {"status": 405, "body": {"detail": "method not allowed"}}
source = _route_source(state)
if path == "/health":
if "def health" not in source or "status\": 200" not in source:
return {"status": 404, "body": {"detail": "health unavailable"}}
return {"status": 200, "body": {"status": "ok"}}
if not path.startswith("/invoices/"):
return {"status": 404, "body": {"detail": "route not found"}}
invoice_id = path.rsplit("/", 1)[-1]
users = state.hidden_facts["users"]
invoices = state.hidden_facts["invoices"]
actor = users.get(user_id or "")
invoice = invoices.get(invoice_id)
if actor is None:
return {"status": 401, "body": {"detail": "unknown user"}}
if invoice is None:
return {"status": 404, "body": {"detail": "invoice not found"}}
if _source_is_deny_all(source):
return {"status": 403, "body": {"detail": "forbidden"}}
if _source_is_secure(source):
same_tenant = invoice["tenant_id"] == actor["tenant_id"]
allowed = invoice["owner_user_id"] == actor["user_id"] or "billing_admin" in actor["roles"]
if same_tenant and allowed:
return {"status": 200, "body": invoice}
return {"status": 403, "body": {"detail": "forbidden"}}
return {"status": 200, "body": invoice}
def verify_finding(state: CyberSecurityOWASPState, finding: dict) -> dict:
text = " ".join(str(v).lower() for v in finding.values())
hidden = state.hidden_facts
mentions_known_invoice = any(
str(hidden.get(key, "")).lower() in text
for key in ("owner_invoice_id", "other_invoice_id", "foreign_invoice_id")
)
mentions_known_user = any(
str(hidden.get(key, "")).lower() in text
for key in ("owner_user_id", "intruder_user_id", "admin_user_id")
)
valid = (
any(term in text for term in ("idor", "bola", "authorization", "owner", "tenant"))
and mentions_known_invoice
and mentions_known_user
)
tied_to_policy = any(term in text for term in ("policy", "owner", "tenant", "billing_admin"))
return {"valid": valid, "tied_to_policy": tied_to_policy}
def run_visible_tests(state: CyberSecurityOWASPState) -> dict:
hidden = state.hidden_facts
checks = {
"health_public": simulate_request(state, "GET", "/health")["status"] == 200,
"owner_can_read": simulate_request(
state, "GET", f"/invoices/{hidden['owner_invoice_id']}", hidden["owner_user_id"]
)["status"]
== 200,
"admin_can_read": simulate_request(
state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["admin_user_id"]
)["status"]
== 200,
}
return {"passed": all(checks.values()), "checks": checks}
def run_hidden_security_tests(state: CyberSecurityOWASPState) -> dict:
hidden = state.hidden_facts
checks = {
"same_tenant_other_user_blocked": simulate_request(
state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["owner_user_id"]
)["status"]
== 403,
"cross_tenant_blocked": simulate_request(
state, "GET", f"/invoices/{hidden['foreign_invoice_id']}", hidden["admin_user_id"]
)["status"]
== 403,
}
return {"passed": all(checks.values()), "checks": checks}
def run_hidden_regression_tests(state: CyberSecurityOWASPState) -> dict:
hidden = state.hidden_facts
checks = {
"owner_preserved": simulate_request(
state, "GET", f"/invoices/{hidden['owner_invoice_id']}", hidden["owner_user_id"]
)["status"]
== 200,
"admin_preserved": simulate_request(
state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["admin_user_id"]
)["status"]
== 200,
}
return {"passed": all(checks.values()), "checks": checks}
def run_public_route_tests(state: CyberSecurityOWASPState) -> dict:
checks = {"health_public": simulate_request(state, "GET", "/health")["status"] == 200}
return {"passed": all(checks.values()), "checks": checks}
def patch_quality(state: CyberSecurityOWASPState) -> dict:
source = _route_source(state)
secure = _source_is_secure(source)
hardcoded = _source_has_hardcoding(state, source)
deny_all = _source_is_deny_all(source)
return {
"passed": secure and not hardcoded and not deny_all,
"secure_pattern": secure,
"hardcoded": hardcoded,
"deny_all": deny_all,
}