Spaces:
Sleeping
Sleeping
File size: 8,491 Bytes
3807ea3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 | """Deterministic validators and hidden checks for CyberSecurity_OWASP."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from .models import CyberSecurityOWASPAction, CyberSecurityOWASPState
BLOCKED_PATH_MARKERS = (
"hidden",
"oracle",
"reward_engine",
"validators.py",
"rewards.py",
".git",
"..",
)
def _workspace(state: CyberSecurityOWASPState) -> Path:
return Path(str(state.hidden_facts["workspace"]))
def _route_source(state: CyberSecurityOWASPState) -> str:
return (_workspace(state) / "app/routes/invoices.py").read_text(encoding="utf-8")
def is_path_allowed(state: CyberSecurityOWASPState, path: str, *, write: bool = False) -> tuple[bool, str]:
normalized = path.replace("\\", "/").lstrip("/")
if any(marker in normalized for marker in BLOCKED_PATH_MARKERS):
return False, "blocked path"
if Path(path).is_absolute():
return False, "absolute host paths are blocked"
editable = set(state.hidden_facts.get("editable_files", []))
if normalized not in editable:
return False, "path is outside editable generated workspace"
if write and normalized.startswith("tests/"):
return False, "visible tests are readable but not patchable"
return True, normalized
def detect_cheating(state: CyberSecurityOWASPState, action: CyberSecurityOWASPAction) -> list[str]:
flags: list[str] = []
args = action.arguments or {}
text = " ".join(str(v) for v in args.values())
for key in (
"owner_user_id",
"intruder_user_id",
"admin_user_id",
"owner_invoice_id",
"other_invoice_id",
"foreign_invoice_id",
"tenant_a",
"tenant_b",
):
value = str(state.hidden_facts.get(key, ""))
if value and value in text and action.tool_name == "patch_file":
flags.append("hardcoded_hidden_identifier")
break
if action.tool_name in {"read_file", "search_code", "patch_file"}:
path_text = str(args.get("path", "")) + " " + str(args.get("query", ""))
if any(marker in path_text.replace("\\", "/") for marker in BLOCKED_PATH_MARKERS):
flags.append("blocked_file_probe")
if action.tool_name == "send_local_request":
target = str(args.get("path", ""))
if target.startswith("http://") or target.startswith("https://"):
flags.append("external_network_attempt")
return sorted(set(flags))
def _source_is_secure(source: str) -> bool:
has_tenant_check = "tenant_id" in source and "actor" in source and "!=" in source
has_owner = "owner_user_id" in source and "actor" in source
has_admin = "is_billing_admin" in source
has_forbidden = '"status": 403' in source or "'status': 403" in source
return has_tenant_check and has_owner and has_admin and has_forbidden
def _source_is_deny_all(source: str) -> bool:
early_unconditional = (
" return {\"status\": 403" in source
and " return {\"status\": 403" in source.split("if invoice[", 1)[0]
)
missing_positive_policy = "owner_user_id" not in source or "is_billing_admin" not in source
return early_unconditional or (
("return {\"status\": 403" in source or "return {'status': 403" in source)
and missing_positive_policy
)
def _source_has_hardcoding(state: CyberSecurityOWASPState, source: str) -> bool:
values = [
str(state.hidden_facts.get(key, ""))
for key in (
"owner_user_id",
"intruder_user_id",
"admin_user_id",
"owner_invoice_id",
"other_invoice_id",
"foreign_invoice_id",
"tenant_a",
"tenant_b",
)
]
return any(value and value in source for value in values)
def simulate_request(
state: CyberSecurityOWASPState, method: str, path: str, user_id: str | None = None
) -> dict[str, Any]:
if method.upper() != "GET":
return {"status": 405, "body": {"detail": "method not allowed"}}
source = _route_source(state)
if path == "/health":
if "def health" not in source or "status\": 200" not in source:
return {"status": 404, "body": {"detail": "health unavailable"}}
return {"status": 200, "body": {"status": "ok"}}
if not path.startswith("/invoices/"):
return {"status": 404, "body": {"detail": "route not found"}}
invoice_id = path.rsplit("/", 1)[-1]
users = state.hidden_facts["users"]
invoices = state.hidden_facts["invoices"]
actor = users.get(user_id or "")
invoice = invoices.get(invoice_id)
if actor is None:
return {"status": 401, "body": {"detail": "unknown user"}}
if invoice is None:
return {"status": 404, "body": {"detail": "invoice not found"}}
if _source_is_deny_all(source):
return {"status": 403, "body": {"detail": "forbidden"}}
if _source_is_secure(source):
same_tenant = invoice["tenant_id"] == actor["tenant_id"]
allowed = invoice["owner_user_id"] == actor["user_id"] or "billing_admin" in actor["roles"]
if same_tenant and allowed:
return {"status": 200, "body": invoice}
return {"status": 403, "body": {"detail": "forbidden"}}
return {"status": 200, "body": invoice}
def verify_finding(state: CyberSecurityOWASPState, finding: dict) -> dict:
text = " ".join(str(v).lower() for v in finding.values())
hidden = state.hidden_facts
mentions_known_invoice = any(
str(hidden.get(key, "")).lower() in text
for key in ("owner_invoice_id", "other_invoice_id", "foreign_invoice_id")
)
mentions_known_user = any(
str(hidden.get(key, "")).lower() in text
for key in ("owner_user_id", "intruder_user_id", "admin_user_id")
)
valid = (
any(term in text for term in ("idor", "bola", "authorization", "owner", "tenant"))
and mentions_known_invoice
and mentions_known_user
)
tied_to_policy = any(term in text for term in ("policy", "owner", "tenant", "billing_admin"))
return {"valid": valid, "tied_to_policy": tied_to_policy}
def run_visible_tests(state: CyberSecurityOWASPState) -> dict:
hidden = state.hidden_facts
checks = {
"health_public": simulate_request(state, "GET", "/health")["status"] == 200,
"owner_can_read": simulate_request(
state, "GET", f"/invoices/{hidden['owner_invoice_id']}", hidden["owner_user_id"]
)["status"]
== 200,
"admin_can_read": simulate_request(
state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["admin_user_id"]
)["status"]
== 200,
}
return {"passed": all(checks.values()), "checks": checks}
def run_hidden_security_tests(state: CyberSecurityOWASPState) -> dict:
hidden = state.hidden_facts
checks = {
"same_tenant_other_user_blocked": simulate_request(
state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["owner_user_id"]
)["status"]
== 403,
"cross_tenant_blocked": simulate_request(
state, "GET", f"/invoices/{hidden['foreign_invoice_id']}", hidden["admin_user_id"]
)["status"]
== 403,
}
return {"passed": all(checks.values()), "checks": checks}
def run_hidden_regression_tests(state: CyberSecurityOWASPState) -> dict:
hidden = state.hidden_facts
checks = {
"owner_preserved": simulate_request(
state, "GET", f"/invoices/{hidden['owner_invoice_id']}", hidden["owner_user_id"]
)["status"]
== 200,
"admin_preserved": simulate_request(
state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["admin_user_id"]
)["status"]
== 200,
}
return {"passed": all(checks.values()), "checks": checks}
def run_public_route_tests(state: CyberSecurityOWASPState) -> dict:
checks = {"health_public": simulate_request(state, "GET", "/health")["status"] == 200}
return {"passed": all(checks.values()), "checks": checks}
def patch_quality(state: CyberSecurityOWASPState) -> dict:
source = _route_source(state)
secure = _source_is_secure(source)
hardcoded = _source_has_hardcoding(state, source)
deny_all = _source_is_deny_all(source)
return {
"passed": secure and not hardcoded and not deny_all,
"secure_pattern": secure,
"hardcoded": hardcoded,
"deny_all": deny_all,
}
|