from __future__ import annotations import json import tempfile from dataclasses import asdict, dataclass from pathlib import Path from types import SimpleNamespace from typing import Any from fastapi.testclient import TestClient from server import app from osint_env.baselines.openai_runner import OpenAIBaselineConfig, OpenAIBaselineRunner, build_action_tools from osint_env.config import clone_environment_config, load_seeding_config, load_shared_config from osint_env.env.environment import OSINTEnvironment from osint_env.env.openenv_compat import Env from osint_env.env.reward import compute_answer_reward README_PATH = Path("README.md") DOCKERFILE_PATH = Path("Dockerfile") OPENENV_SPEC_PATH = Path("openenv.yaml") SHARED_CONFIG_PATH = "datasets/fixed_levels/shared_config_fixed_levels.json" SEED_FILE_PATH = "datasets/fixed_levels/seed_fixed_levels.json" @dataclass(slots=True) class ValidationResult: name: str passed: bool details: dict[str, Any] def _build_environment() -> OSINTEnvironment: shared = load_shared_config(SHARED_CONFIG_PATH) env_cfg = clone_environment_config(shared.environment) env_cfg.seeding = load_seeding_config(SEED_FILE_PATH) env_cfg.llm.provider = "mock" return OSINTEnvironment(env_cfg) def check_hf_space_readiness() -> ValidationResult: text = README_PATH.read_text(encoding="utf-8") has_sdk = "sdk: docker" in text has_port = "app_port: 7860" in text has_openenv_tag = "- openenv" in text client = TestClient(app) health = client.get("/healthz") dashboard = client.get("/api/environment") spec = client.get("/openenv.yaml") passed = all( [ README_PATH.exists(), DOCKERFILE_PATH.exists(), OPENENV_SPEC_PATH.exists(), has_sdk, has_port, has_openenv_tag, health.status_code == 200, dashboard.status_code == 200, spec.status_code == 200, ] ) return ValidationResult( name="hf_space_readiness", passed=passed, details={ "readme_exists": README_PATH.exists(), "dockerfile_exists": DOCKERFILE_PATH.exists(), "openenv_spec_exists": OPENENV_SPEC_PATH.exists(), "has_sdk_docker": has_sdk, "has_app_port": has_port, "has_openenv_tag": has_openenv_tag, "healthz_status": health.status_code, "environment_status": dashboard.status_code, "openenv_spec_status": spec.status_code, }, ) def check_openenv_spec_compliance() -> ValidationResult: env = _build_environment() obs = env.reset() client = TestClient(app) reset = client.post("/openenv/reset", json={"task_index": 0}) step = client.post( "/openenv/step", json={ "session_id": reset.json()["session_id"] if reset.status_code == 200 else "", "action_type": "ANSWER", "payload": {"answer": "unknown"}, }, ) state = client.get(f"/openenv/state/{reset.json()['session_id']}") if reset.status_code == 200 else None passed = all( [ isinstance(env, Env), hasattr(env, "reset"), hasattr(env, "step"), env.name == "OSINTEnvironment", env.state_space == "json-observation", env.action_space == ["CALL_TOOL", "ADD_EDGE", "ANSWER"], env.episode_max_length == env.config.max_steps, isinstance(obs.task, dict), "question" in obs.task, reset.status_code == 200, step.status_code == 200, state is not None and state.status_code == 200, ] ) return ValidationResult( name="openenv_spec_compliance", passed=passed, details={ "env_class": type(env).__name__, "state_space": env.state_space, "action_space": list(env.action_space), "episode_max_length": env.episode_max_length, "task_keys": sorted(obs.task.keys()), "reset_status": reset.status_code, "step_status": step.status_code, "state_status": 0 if state is None else state.status_code, }, ) class _FakeMessage: def __init__(self, answer: str): self.content = "" self.tool_calls = [ SimpleNamespace( id="fake_tool_call_0", function=SimpleNamespace(name="submit_answer", arguments=json.dumps({"answer": answer})), ) ] class _FakeCompletion: def __init__(self, answer: str): self.choices = [SimpleNamespace(message=_FakeMessage(answer))] self.usage = SimpleNamespace(prompt_tokens=0, completion_tokens=0, total_tokens=0) self.system_fingerprint = "validation_fp" class _FakeChatCompletions: def create(self, **kwargs: Any) -> _FakeCompletion: messages = list(kwargs.get("messages", [])) initial_observation = {} for message in messages: if message.get("role") == "user": try: initial_observation = json.loads(message.get("content", "{}")) except json.JSONDecodeError: initial_observation = {} break task_id = ((initial_observation.get("task") or {}).get("task_id")) or "" env = _build_environment() task = next((task for task in env.tasks if task.task_id == task_id), None) answer = task.answer if task is not None else "unknown" return _FakeCompletion(answer) class _FakeOpenAIClient: def __init__(self) -> None: self.chat = SimpleNamespace(completions=_FakeChatCompletions()) def _run_fake_baseline_once(output_dir: Path) -> dict[str, Any]: config = OpenAIBaselineConfig( api_key="validation", episodes=3, max_steps=4, append_leaderboard=False, output_path=str(output_dir / "baseline.json"), dashboard_path=str(output_dir / "baseline.html"), leaderboard_path=str(output_dir / "leaderboard.json"), run_name="validation_baseline", ) runner = OpenAIBaselineRunner.__new__(OpenAIBaselineRunner) runner.config = config runner.client = _FakeOpenAIClient() runner.tools = build_action_tools() return runner.run() def check_baseline_reproducibility() -> ValidationResult: with tempfile.TemporaryDirectory() as left_dir_name, tempfile.TemporaryDirectory() as right_dir_name: left = _run_fake_baseline_once(Path(left_dir_name)) right = _run_fake_baseline_once(Path(right_dir_name)) left_signature = { "summary": left["summary"], "episodes": [ { "task_id": episode["task_id"], "task_answer": episode["task_answer"], "agent_answer": episode["agent_answer"], "success": episode["success"], "steps": episode["steps"], } for episode in left["episodes"] ], } right_signature = { "summary": right["summary"], "episodes": [ { "task_id": episode["task_id"], "task_answer": episode["task_answer"], "agent_answer": episode["agent_answer"], "success": episode["success"], "steps": episode["steps"], } for episode in right["episodes"] ], } passed = left_signature == right_signature return ValidationResult( name="baseline_reproducibility", passed=passed, details={ "episodes_checked": len(left_signature["episodes"]), "left_signature": left_signature, "right_signature": right_signature, }, ) def check_task_and_grader_coverage() -> ValidationResult: env = _build_environment() tasks = env.tasks grader_checks: list[dict[str, Any]] = [] distinct_types = sorted({str(task.task_type) for task in tasks}) difficulty_buckets: dict[str, Any] = {} for idx, task in enumerate(tasks): token = str((task.metadata or {}).get("difficulty", "")).strip().lower() if token in {"mid", "m"}: token = "medium" if token in {"high", "h"}: token = "hard" if token not in {"easy", "medium", "hard"}: if idx < 10: token = "easy" elif idx < 20: token = "medium" else: token = "hard" difficulty_buckets.setdefault(token, task) for difficulty in ["easy", "medium", "hard"]: task = difficulty_buckets.get(difficulty) if task is None: continue correct = compute_answer_reward( proposed_answer=task.answer, task=task, pred_edges=list(task.supporting_edges), tool_outputs=[], step_count=1, model=env.reward_model, difficulty=difficulty, ) wrong = compute_answer_reward( proposed_answer="unknown", task=task, pred_edges=[], tool_outputs=[], step_count=1, model=env.reward_model, difficulty=difficulty, ) grader = dict(task.metadata.get("grader", {})) if isinstance(task.metadata, dict) else {} grader_checks.append( { "difficulty": difficulty, "task_id": task.task_id, "task_type": task.task_type, "support_edges": len(task.supporting_edges), "has_grader": bool(grader), "correct_reward": correct.total, "wrong_reward": wrong.total, "grader_prefers_correct": correct.total > wrong.total, } ) passed = ( len(tasks) >= 3 and len(distinct_types) >= 3 and len(grader_checks) >= 3 and all( row["support_edges"] > 0 and row["grader_prefers_correct"] and row["has_grader"] for row in grader_checks ) ) return ValidationResult( name="task_and_grader_coverage", passed=passed, details={ "task_count": len(tasks), "distinct_task_types": distinct_types, "grader_checks": grader_checks, }, ) def run_validation_suite() -> dict[str, Any]: results = [ check_hf_space_readiness(), check_openenv_spec_compliance(), check_baseline_reproducibility(), check_task_and_grader_coverage(), ] passed = all(result.passed for result in results) return { "passed": passed, "checks": [asdict(result) for result in results], }