Spaces:
Sleeping
Sleeping
File size: 10,740 Bytes
3807ea3 f3080d1 3807ea3 be8eade 3807ea3 be8eade 3807ea3 be8eade 3807ea3 be8eade 3807ea3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 | """Deterministic validators and hidden checks for CyberSecurity_OWASP."""
from __future__ import annotations
from pathlib import Path
from typing import Any
try:
from .models import CyberSecurityOWASPAction, CyberSecurityOWASPState
except ImportError: # pragma: no cover
from models import CyberSecurityOWASPAction, CyberSecurityOWASPState
BLOCKED_PATH_MARKERS = (
"hidden",
"oracle",
"reward_engine",
"validators.py",
"rewards.py",
".git",
"..",
)
def _workspace(state: CyberSecurityOWASPState) -> Path:
return Path(str(state.hidden_facts["workspace"]))
def _route_source(state: CyberSecurityOWASPState) -> str:
return (_workspace(state) / "app/routes/invoices.py").read_text(encoding="utf-8")
def is_path_allowed(state: CyberSecurityOWASPState, path: str, *, write: bool = False) -> tuple[bool, str]:
normalized = path.replace("\\", "/").lstrip("/")
if any(marker in normalized for marker in BLOCKED_PATH_MARKERS):
return False, "blocked path"
if Path(path).is_absolute():
return False, "absolute host paths are blocked"
editable = set(state.hidden_facts.get("editable_files", []))
if normalized not in editable:
return False, "path is outside editable generated workspace"
if write and normalized.startswith("tests/"):
return False, "visible tests are readable but not patchable"
return True, normalized
def detect_cheating(state: CyberSecurityOWASPState, action: CyberSecurityOWASPAction) -> list[str]:
flags: list[str] = []
args = action.arguments or {}
text = " ".join(str(v) for v in args.values())
for key in (
"owner_user_id",
"intruder_user_id",
"admin_user_id",
"owner_invoice_id",
"other_invoice_id",
"foreign_invoice_id",
"tenant_a",
"tenant_b",
):
value = str(state.hidden_facts.get(key, ""))
if value and value in text and action.tool_name == "patch_file":
flags.append("hardcoded_hidden_identifier")
break
if action.tool_name in {"read_file", "search_code", "patch_file"}:
path_text = str(args.get("path", "")) + " " + str(args.get("query", ""))
normalized = path_text.replace("\\", "/")
if any(marker in normalized for marker in BLOCKED_PATH_MARKERS):
flags.append("blocked_file_probe")
if ".." in normalized or Path(str(args.get("path", ""))).is_absolute():
flags.append("host_path_escape")
if action.tool_name == "patch_file" and normalized.lstrip("/").startswith("tests/"):
flags.append("test_file_edit_attempt")
if action.tool_name == "send_local_request":
target = str(args.get("path", ""))
if target.startswith("http://") or target.startswith("https://"):
flags.append("external_network_attempt")
return sorted(set(flags))
def _source_is_secure(source: str) -> bool:
has_tenant_check = "tenant_id" in source and "actor" in source and "!=" in source
has_owner = "owner_user_id" in source and "actor" in source
has_admin = "is_billing_admin" in source
has_forbidden = '"status": 403' in source or "'status': 403" in source
return has_tenant_check and has_owner and has_admin and has_forbidden
def _source_is_deny_all(source: str) -> bool:
early_unconditional = (
" return {\"status\": 403" in source
and " return {\"status\": 403" in source.split("if invoice[", 1)[0]
)
missing_positive_policy = "owner_user_id" not in source or "is_billing_admin" not in source
return early_unconditional or (
("return {\"status\": 403" in source or "return {'status': 403" in source)
and missing_positive_policy
)
def _source_has_hardcoding(state: CyberSecurityOWASPState, source: str) -> bool:
values = [
str(state.hidden_facts.get(key, ""))
for key in (
"owner_user_id",
"intruder_user_id",
"admin_user_id",
"owner_invoice_id",
"other_invoice_id",
"foreign_invoice_id",
"tenant_a",
"tenant_b",
)
]
return any(value and value in source for value in values)
def simulate_request(
state: CyberSecurityOWASPState, method: str, path: str, user_id: str | None = None
) -> dict[str, Any]:
if method.upper() != "GET":
return {"status": 405, "body": {"detail": "method not allowed"}}
source = _route_source(state)
if path == "/health":
if "def health" not in source or "status\": 200" not in source:
return {"status": 404, "body": {"detail": "health unavailable"}}
return {"status": 200, "body": {"status": "ok"}}
if not path.startswith("/invoices/"):
return {"status": 404, "body": {"detail": "route not found"}}
invoice_id = path.rsplit("/", 1)[-1]
users = state.hidden_facts["users"]
invoices = state.hidden_facts["invoices"]
actor = users.get(user_id or "")
invoice = invoices.get(invoice_id)
if actor is None:
return {"status": 401, "body": {"detail": "unknown user"}}
if invoice is None:
return {"status": 404, "body": {"detail": "invoice not found"}}
if _source_is_deny_all(source):
return {"status": 403, "body": {"detail": "forbidden"}}
if _source_is_secure(source):
same_tenant = invoice["tenant_id"] == actor["tenant_id"]
allowed = invoice["owner_user_id"] == actor["user_id"] or "billing_admin" in actor["roles"]
if same_tenant and allowed:
return {"status": 200, "body": invoice}
return {"status": 403, "body": {"detail": "forbidden"}}
return {"status": 200, "body": invoice}
def verify_finding(state: CyberSecurityOWASPState, finding: dict) -> dict:
text = " ".join(str(v).lower() for v in finding.values())
hidden = state.hidden_facts
route = str(finding.get("route", "")).lower()
bug_class = str(finding.get("bug_class", "")).lower()
policy_rule = str(finding.get("violated_policy_rule", finding.get("policy_rule", ""))).lower()
evidence_ids = {
str(item)
for item in finding.get("evidence_trace_ids", [])
if str(item).strip()
}
mentions_known_invoice = any(
str(hidden.get(key, "")).lower() in text
for key in ("owner_invoice_id", "other_invoice_id", "foreign_invoice_id")
)
mentions_known_user = any(
str(hidden.get(key, "")).lower() in text
for key in ("owner_user_id", "intruder_user_id", "admin_user_id")
)
valid = (
any(term in text for term in ("idor", "bola", "authorization", "owner", "tenant"))
and mentions_known_invoice
and mentions_known_user
)
tied_to_policy = any(term in text for term in ("policy", "owner", "tenant", "billing_admin"))
correct_route = "/invoices" in route or "/invoices" in text
correct_bug_class = any(
term in bug_class or term in text
for term in ("idor", "bola", "authorization", "ownership", "tenant")
)
correct_policy = any(term in policy_rule for term in ("owner", "tenant", "billing_admin"))
valid_evidence = _has_valid_trace_evidence(state, evidence_ids)
structured_valid = correct_route and correct_bug_class and correct_policy and valid_evidence
return {
"valid": valid or structured_valid,
"tied_to_policy": tied_to_policy or correct_policy,
"correct_route": correct_route,
"correct_bug_class": correct_bug_class,
"correct_policy_rule": correct_policy,
"valid_local_evidence": valid_evidence,
}
def _has_valid_trace_evidence(state: CyberSecurityOWASPState, evidence_ids: set[str]) -> bool:
if not evidence_ids:
return False
hidden = state.hidden_facts
for trace in state.request_trace:
if str(trace.get("trace_id")) not in evidence_ids:
continue
path = str(trace.get("path", ""))
user_id = str(trace.get("user_id", ""))
status = int(trace.get("status", 0) or 0)
if (
hidden.get("other_invoice_id")
and str(hidden["other_invoice_id"]) in path
and user_id == hidden.get("owner_user_id")
and status == 200
):
return True
if bool(trace.get("unauthorized_success", False)):
return True
return False
def run_visible_tests(state: CyberSecurityOWASPState) -> dict:
hidden = state.hidden_facts
checks = {
"health_public": simulate_request(state, "GET", "/health")["status"] == 200,
"owner_can_read": simulate_request(
state, "GET", f"/invoices/{hidden['owner_invoice_id']}", hidden["owner_user_id"]
)["status"]
== 200,
"admin_can_read": simulate_request(
state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["admin_user_id"]
)["status"]
== 200,
}
return {"passed": all(checks.values()), "checks": checks}
def run_hidden_security_tests(state: CyberSecurityOWASPState) -> dict:
hidden = state.hidden_facts
checks = {
"same_tenant_other_user_blocked": simulate_request(
state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["owner_user_id"]
)["status"]
== 403,
"cross_tenant_blocked": simulate_request(
state, "GET", f"/invoices/{hidden['foreign_invoice_id']}", hidden["admin_user_id"]
)["status"]
== 403,
}
return {"passed": all(checks.values()), "checks": checks}
def run_hidden_regression_tests(state: CyberSecurityOWASPState) -> dict:
hidden = state.hidden_facts
checks = {
"owner_preserved": simulate_request(
state, "GET", f"/invoices/{hidden['owner_invoice_id']}", hidden["owner_user_id"]
)["status"]
== 200,
"admin_preserved": simulate_request(
state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["admin_user_id"]
)["status"]
== 200,
}
return {"passed": all(checks.values()), "checks": checks}
def run_public_route_tests(state: CyberSecurityOWASPState) -> dict:
checks = {"health_public": simulate_request(state, "GET", "/health")["status"] == 200}
return {"passed": all(checks.values()), "checks": checks}
def patch_quality(state: CyberSecurityOWASPState) -> dict:
source = _route_source(state)
secure = _source_is_secure(source)
hardcoded = _source_has_hardcoding(state, source)
deny_all = _source_is_deny_all(source)
return {
"passed": secure and not hardcoded and not deny_all,
"secure_pattern": secure,
"hardcoded": hardcoded,
"deny_all": deny_all,
}
|