OSINT / src /osint_env /validation.py
Siddeshwar1625's picture
fixed tasks
515f8c0
from __future__ import annotations
import json
import tempfile
from dataclasses import asdict, dataclass
from pathlib import Path
from types import SimpleNamespace
from typing import Any
from fastapi.testclient import TestClient
from server import app
from osint_env.baselines.openai_runner import OpenAIBaselineConfig, OpenAIBaselineRunner, build_action_tools
from osint_env.config import clone_environment_config, load_seeding_config, load_shared_config
from osint_env.env.environment import OSINTEnvironment
from osint_env.env.openenv_compat import Env
from osint_env.env.reward import compute_answer_reward
README_PATH = Path("README.md")
DOCKERFILE_PATH = Path("Dockerfile")
OPENENV_SPEC_PATH = Path("openenv.yaml")
SHARED_CONFIG_PATH = "datasets/fixed_levels/shared_config_fixed_levels.json"
SEED_FILE_PATH = "datasets/fixed_levels/seed_fixed_levels.json"
@dataclass(slots=True)
class ValidationResult:
name: str
passed: bool
details: dict[str, Any]
def _build_environment() -> OSINTEnvironment:
shared = load_shared_config(SHARED_CONFIG_PATH)
env_cfg = clone_environment_config(shared.environment)
env_cfg.seeding = load_seeding_config(SEED_FILE_PATH)
env_cfg.llm.provider = "mock"
return OSINTEnvironment(env_cfg)
def check_hf_space_readiness() -> ValidationResult:
text = README_PATH.read_text(encoding="utf-8")
has_sdk = "sdk: docker" in text
has_port = "app_port: 7860" in text
has_openenv_tag = "- openenv" in text
client = TestClient(app)
health = client.get("/healthz")
dashboard = client.get("/api/environment")
spec = client.get("/openenv.yaml")
passed = all(
[
README_PATH.exists(),
DOCKERFILE_PATH.exists(),
OPENENV_SPEC_PATH.exists(),
has_sdk,
has_port,
has_openenv_tag,
health.status_code == 200,
dashboard.status_code == 200,
spec.status_code == 200,
]
)
return ValidationResult(
name="hf_space_readiness",
passed=passed,
details={
"readme_exists": README_PATH.exists(),
"dockerfile_exists": DOCKERFILE_PATH.exists(),
"openenv_spec_exists": OPENENV_SPEC_PATH.exists(),
"has_sdk_docker": has_sdk,
"has_app_port": has_port,
"has_openenv_tag": has_openenv_tag,
"healthz_status": health.status_code,
"environment_status": dashboard.status_code,
"openenv_spec_status": spec.status_code,
},
)
def check_openenv_spec_compliance() -> ValidationResult:
env = _build_environment()
obs = env.reset()
client = TestClient(app)
reset = client.post("/openenv/reset", json={"task_index": 0})
step = client.post(
"/openenv/step",
json={
"session_id": reset.json()["session_id"] if reset.status_code == 200 else "",
"action_type": "ANSWER",
"payload": {"answer": "unknown"},
},
)
state = client.get(f"/openenv/state/{reset.json()['session_id']}") if reset.status_code == 200 else None
passed = all(
[
isinstance(env, Env),
hasattr(env, "reset"),
hasattr(env, "step"),
env.name == "OSINTEnvironment",
env.state_space == "json-observation",
env.action_space == ["CALL_TOOL", "ADD_EDGE", "ANSWER"],
env.episode_max_length == env.config.max_steps,
isinstance(obs.task, dict),
"question" in obs.task,
reset.status_code == 200,
step.status_code == 200,
state is not None and state.status_code == 200,
]
)
return ValidationResult(
name="openenv_spec_compliance",
passed=passed,
details={
"env_class": type(env).__name__,
"state_space": env.state_space,
"action_space": list(env.action_space),
"episode_max_length": env.episode_max_length,
"task_keys": sorted(obs.task.keys()),
"reset_status": reset.status_code,
"step_status": step.status_code,
"state_status": 0 if state is None else state.status_code,
},
)
class _FakeMessage:
def __init__(self, answer: str):
self.content = ""
self.tool_calls = [
SimpleNamespace(
id="fake_tool_call_0",
function=SimpleNamespace(name="submit_answer", arguments=json.dumps({"answer": answer})),
)
]
class _FakeCompletion:
def __init__(self, answer: str):
self.choices = [SimpleNamespace(message=_FakeMessage(answer))]
self.usage = SimpleNamespace(prompt_tokens=0, completion_tokens=0, total_tokens=0)
self.system_fingerprint = "validation_fp"
class _FakeChatCompletions:
def create(self, **kwargs: Any) -> _FakeCompletion:
messages = list(kwargs.get("messages", []))
initial_observation = {}
for message in messages:
if message.get("role") == "user":
try:
initial_observation = json.loads(message.get("content", "{}"))
except json.JSONDecodeError:
initial_observation = {}
break
task_id = ((initial_observation.get("task") or {}).get("task_id")) or ""
env = _build_environment()
task = next((task for task in env.tasks if task.task_id == task_id), None)
answer = task.answer if task is not None else "unknown"
return _FakeCompletion(answer)
class _FakeOpenAIClient:
def __init__(self) -> None:
self.chat = SimpleNamespace(completions=_FakeChatCompletions())
def _run_fake_baseline_once(output_dir: Path) -> dict[str, Any]:
config = OpenAIBaselineConfig(
api_key="validation",
episodes=3,
max_steps=4,
append_leaderboard=False,
output_path=str(output_dir / "baseline.json"),
dashboard_path=str(output_dir / "baseline.html"),
leaderboard_path=str(output_dir / "leaderboard.json"),
run_name="validation_baseline",
)
runner = OpenAIBaselineRunner.__new__(OpenAIBaselineRunner)
runner.config = config
runner.client = _FakeOpenAIClient()
runner.tools = build_action_tools()
return runner.run()
def check_baseline_reproducibility() -> ValidationResult:
with tempfile.TemporaryDirectory() as left_dir_name, tempfile.TemporaryDirectory() as right_dir_name:
left = _run_fake_baseline_once(Path(left_dir_name))
right = _run_fake_baseline_once(Path(right_dir_name))
left_signature = {
"summary": left["summary"],
"episodes": [
{
"task_id": episode["task_id"],
"task_answer": episode["task_answer"],
"agent_answer": episode["agent_answer"],
"success": episode["success"],
"steps": episode["steps"],
}
for episode in left["episodes"]
],
}
right_signature = {
"summary": right["summary"],
"episodes": [
{
"task_id": episode["task_id"],
"task_answer": episode["task_answer"],
"agent_answer": episode["agent_answer"],
"success": episode["success"],
"steps": episode["steps"],
}
for episode in right["episodes"]
],
}
passed = left_signature == right_signature
return ValidationResult(
name="baseline_reproducibility",
passed=passed,
details={
"episodes_checked": len(left_signature["episodes"]),
"left_signature": left_signature,
"right_signature": right_signature,
},
)
def check_task_and_grader_coverage() -> ValidationResult:
env = _build_environment()
tasks = env.tasks
grader_checks: list[dict[str, Any]] = []
distinct_types = sorted({str(task.task_type) for task in tasks})
difficulty_buckets: dict[str, Any] = {}
for idx, task in enumerate(tasks):
token = str((task.metadata or {}).get("difficulty", "")).strip().lower()
if token in {"mid", "m"}:
token = "medium"
if token in {"high", "h"}:
token = "hard"
if token not in {"easy", "medium", "hard"}:
if idx < 10:
token = "easy"
elif idx < 20:
token = "medium"
else:
token = "hard"
difficulty_buckets.setdefault(token, task)
for difficulty in ["easy", "medium", "hard"]:
task = difficulty_buckets.get(difficulty)
if task is None:
continue
correct = compute_answer_reward(
proposed_answer=task.answer,
task=task,
pred_edges=list(task.supporting_edges),
tool_outputs=[],
step_count=1,
model=env.reward_model,
difficulty=difficulty,
)
wrong = compute_answer_reward(
proposed_answer="unknown",
task=task,
pred_edges=[],
tool_outputs=[],
step_count=1,
model=env.reward_model,
difficulty=difficulty,
)
grader = dict(task.metadata.get("grader", {})) if isinstance(task.metadata, dict) else {}
grader_checks.append(
{
"difficulty": difficulty,
"task_id": task.task_id,
"task_type": task.task_type,
"support_edges": len(task.supporting_edges),
"has_grader": bool(grader),
"correct_reward": correct.total,
"wrong_reward": wrong.total,
"grader_prefers_correct": correct.total > wrong.total,
}
)
passed = (
len(tasks) >= 3
and len(distinct_types) >= 3
and len(grader_checks) >= 3
and all(
row["support_edges"] > 0 and row["grader_prefers_correct"] and row["has_grader"]
for row in grader_checks
)
)
return ValidationResult(
name="task_and_grader_coverage",
passed=passed,
details={
"task_count": len(tasks),
"distinct_task_types": distinct_types,
"grader_checks": grader_checks,
},
)
def run_validation_suite() -> dict[str, Any]:
results = [
check_hf_space_readiness(),
check_openenv_spec_compliance(),
check_baseline_reproducibility(),
check_task_and_grader_coverage(),
]
passed = all(result.passed for result in results)
return {
"passed": passed,
"checks": [asdict(result) for result in results],
}