from __future__ import annotations import json import os from pathlib import Path from typing import Any from osint_env.agents.single_agent import SingleAgentRunner from osint_env.agents.swarm_agent import SwarmAgentRunner from osint_env.config import clone_environment_config, load_seeding_config, load_shared_config from osint_env.domain.models import EnvironmentConfig from osint_env.env.environment import OSINTEnvironment from osint_env.env.reward import compute_graph_f1 from osint_env.eval.leaderboard import append_leaderboard_record, load_leaderboard from osint_env.eval.metrics import EvalMetrics from osint_env.llm import build_llm_client from osint_env.viz import export_dashboard CONFIG_PATH = os.getenv("CONFIG_PATH", "datasets/fixed_levels/shared_config_fixed_levels.json") SEED_FILE = os.getenv("SEED_FILE", "datasets/fixed_levels/seed_fixed_levels.json") AGENT_MODE = os.getenv("AGENT_MODE", "swarm") LLM_PROVIDER = os.getenv("LLM_PROVIDER", "openai") MODEL_NAME = os.getenv("MODEL_NAME", "gpt-5.4") OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "") OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") OPENAI_API_KEY_ENV = os.getenv("OPENAI_API_KEY_ENV", "OPENAI_API_KEY") API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1") API_KEY = os.getenv("API_KEY", "") HF_SPACE_URL = os.getenv("HF_SPACE_URL", "") HF_TOKEN = os.getenv("HF_TOKEN","") LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME", "") LLM_TIMEOUT_SECONDS = int(os.getenv("LLM_TIMEOUT_SECONDS", "0")) EPISODES = int(os.getenv("EPISODES", "1")) SUCCESS_SCORE_THRESHOLD = float(os.getenv("SUCCESS_SCORE_THRESHOLD", "0.67")) TASK_INDICES_RAW = os.getenv("TASK_INDICES", "") DATASET_MODE = os.getenv("DATASET_MODE", "") METAQA_ROOT = os.getenv("METAQA_ROOT", "") METAQA_KB_PATH = os.getenv("METAQA_KB_PATH", "") METAQA_VARIANT = os.getenv("METAQA_VARIANT", "") METAQA_HOPS_RAW = os.getenv("METAQA_HOPS", "") METAQA_SPLITS_RAW = os.getenv("METAQA_SPLITS", "") WRITE_BENCHMARK_ARTIFACTS = os.getenv("WRITE_BENCHMARK_ARTIFACTS", "1").strip().lower() in { "1", "true", "yes", "y", "on", } LEADERBOARD_PATH = os.getenv("LEADERBOARD_PATH", "datasets/fixed_levels/leaderboard_fixed_levels.json") DASHBOARD_PATH = os.getenv("DASHBOARD_PATH", "datasets/fixed_levels/dashboard_fixed_levels.html") RUN_NAME = os.getenv("RUN_NAME", "fixed_levels_qwen_swarm") BENCHMARK = "osint-openenv" TASK_NAME = "fixed_levels_easy_mid_hard" def _parse_task_indices(raw: str) -> list[int]: out: list[int] = [] for token in str(raw or "").split(","): stripped = token.strip() if not stripped: continue try: out.append(int(stripped)) except ValueError: continue return out def _parse_csv_tokens(raw: str) -> list[str]: return [token.strip() for token in str(raw or "").split(",") if token.strip()] def _normalize_ollama_base_url(url: str) -> str: normalized = str(url or "").strip().rstrip("/") if normalized.endswith("/v1"): normalized = normalized[:-3].rstrip("/") return normalized or "http://127.0.0.1:11434" def _normalize_openai_base_url(url: str) -> str: normalized = str(url or "").strip().rstrip("/") if not normalized: return "" if normalized.endswith("/v1"): return normalized return f"{normalized}/v1" TASK_INDICES = _parse_task_indices(TASK_INDICES_RAW) def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: str | None) -> None: error_text = "null" if error is None else str(error) print( f"[STEP] step={step} action={action} reward={reward:.2f} done={str(bool(done)).lower()} error={error_text}", flush=True, ) def log_end(task: str, success: bool, steps: int, score: float, rewards: list[float]) -> None: rewards_text = ",".join(f"{value:.2f}" for value in rewards) print( f"[END] success={str(bool(success)).lower()} steps={steps} score={score:.2f} rewards={rewards_text}", flush=True, ) def _looks_like_placeholder_api_key(value: str) -> bool: token = str(value or "").strip().lower() if not token: return True placeholder_markers = [ "your_openai_api_key", "your-key", "your_key", "your real", "real-openai-key", "replace-me", "changeme", "example", "", ] if token.startswith("your_") or token.startswith("sk-your-"): return True return any(marker in token for marker in placeholder_markers) def _format_action(action: dict[str, Any]) -> str: action_type = str(action.get("action_type", "")).upper() payload = dict(action.get("payload", {})) if action_type == "ANSWER": return f"answer({str(payload.get('answer', 'unknown')).strip()})" if action_type == "ADD_EDGE": try: conf = float(payload.get("confidence", 1.0)) except (TypeError, ValueError): conf = 1.0 return ( "add_edge(" f"{payload.get('src', '')}," f"{payload.get('rel', '')}," f"{payload.get('dst', '')}," f"{conf:.2f}" ")" ) tool_name = str(payload.get("tool_name", "tool")).strip() or "tool" args = payload.get("args", {}) if not isinstance(args, dict) or not args: return f"{tool_name}()" args_text = ",".join(f"{key}={value}" for key, value in sorted(args.items())) return f"{tool_name}({args_text})" def _assistant_tool_call_id(message: dict[str, Any]) -> str | None: tool_calls = list(message.get("tool_calls", [])) if not tool_calls: return None tool_call_id = tool_calls[0].get("id") return str(tool_call_id) if tool_call_id else None def _tool_result_message(assistant_message: dict[str, Any], result: dict[str, Any]) -> dict[str, Any] | None: tool_call_id = _assistant_tool_call_id(assistant_message) if not tool_call_id: return None return { "role": "tool", "tool_call_id": tool_call_id, "content": json.dumps(result, sort_keys=True), } def _resolve_environment_config() -> EnvironmentConfig: shared = load_shared_config(CONFIG_PATH) env_cfg = clone_environment_config(shared.environment) if SEED_FILE and Path(SEED_FILE).exists(): env_cfg.seeding = load_seeding_config(SEED_FILE) mode = AGENT_MODE.strip().lower() if mode == "single": env_cfg.swarm.enabled = False elif mode == "swarm": env_cfg.swarm.enabled = True # Inference submissions must route all calls through OpenAI-compatible client config. env_cfg.llm.provider = "openai" env_cfg.llm.model = MODEL_NAME.strip() if LLM_TIMEOUT_SECONDS > 0: env_cfg.llm.timeout_seconds = int(LLM_TIMEOUT_SECONDS) # Evaluation harnesses inject API_BASE_URL/HF_TOKEN for proxy-enforced requests. resolved_openai_base = API_BASE_URL.strip() or OPENAI_BASE_URL.strip() or HF_SPACE_URL.strip() if resolved_openai_base: env_cfg.llm.openai_base_url = _normalize_openai_base_url(resolved_openai_base) if HF_TOKEN.strip(): env_cfg.llm.openai_api_key = HF_TOKEN.strip() elif API_KEY.strip(): env_cfg.llm.openai_api_key = API_KEY.strip() elif OPENAI_API_KEY.strip(): env_cfg.llm.openai_api_key = OPENAI_API_KEY.strip() if OPENAI_API_KEY_ENV.strip(): env_cfg.llm.openai_api_key_env = OPENAI_API_KEY_ENV.strip() dataset_mode = DATASET_MODE.strip().lower() if dataset_mode in {"canonical", "metaqa"}: env_cfg.dataset_mode = dataset_mode if METAQA_ROOT.strip(): env_cfg.metaqa_root = METAQA_ROOT.strip() if METAQA_KB_PATH.strip(): env_cfg.metaqa_kb_path = METAQA_KB_PATH.strip() metaqa_variant = METAQA_VARIANT.strip().lower() if metaqa_variant in {"vanilla", "ntm"}: env_cfg.metaqa_variant = metaqa_variant metaqa_hops = _parse_csv_tokens(METAQA_HOPS_RAW) if metaqa_hops: env_cfg.metaqa_hops = metaqa_hops metaqa_splits = _parse_csv_tokens(METAQA_SPLITS_RAW) if metaqa_splits: env_cfg.metaqa_splits = metaqa_splits return env_cfg def _runner_for(env: OSINTEnvironment, llm: Any) -> SingleAgentRunner | SwarmAgentRunner: if env.config.swarm.enabled: return SwarmAgentRunner(env=env, llm=llm) return SingleAgentRunner(env=env, llm=llm) def _normalize_difficulty(value: str) -> str: token = str(value or "").strip().lower() if token in {"easy", "e"}: return "easy" if token in {"mid", "medium", "m"}: return "medium" if token in {"high", "hard", "h"}: return "hard" return "hard" def _task_difficulty(env: OSINTEnvironment, task_index: int) -> str: idx = int(task_index) % max(1, len(env.tasks)) task = env.tasks[idx] if isinstance(task.metadata, dict) and "difficulty" in task.metadata: return _normalize_difficulty(str(task.metadata.get("difficulty", ""))) if idx < 10: return "easy" if idx < 20: return "medium" return "hard" def _episode_row(env: OSINTEnvironment, info: dict[str, Any]) -> dict[str, Any]: if env.state is None: return { "task_id": "unknown", "task_type": "unknown", "question": "", "task_answer": str(info.get("task_answer", "")), "agent_answer": str(info.get("agent_answer", "")), "graph_f1": 0.0, "reward": float(info.get("total_reward", 0.0) or 0.0), "steps": int(info.get("step_count", 0) or 0), "tool_calls": int(info.get("tool_calls", 0) or 0), "success": int(info.get("agent_answer") == info.get("task_answer")), "reward_components": dict(info.get("reward_components", {})), "pred_edges": [], "truth_edges": [], } graph_f1 = compute_graph_f1(env.memory_graph.edges, env.state.task.supporting_edges) return { "task_id": env.state.task.task_id, "task_type": env.state.task.task_type, "question": env.state.task.question, "task_answer": str(info.get("task_answer", "")), "agent_answer": str(info.get("agent_answer", "")) if info.get("agent_answer") is not None else "", "graph_f1": graph_f1, "reward": float(info.get("total_reward", 0.0) or 0.0), "steps": int(info.get("step_count", 0) or 0), "tool_calls": int(info.get("tool_calls", 0) or 0), "success": int(info.get("agent_answer") == info.get("task_answer")), "reward_components": dict(info.get("reward_components", {})), "spawn_count": int(info.get("spawn_count", 0) or 0), "spawn_critical_steps": int(info.get("spawn_critical_steps", 0) or 0), "pred_edges": [ { "src": edge.src, "rel": edge.rel, "dst": edge.dst, "confidence": float(edge.confidence), } for edge in env.memory_graph.edges ], "truth_edges": [ { "src": edge.src, "rel": edge.rel, "dst": edge.dst, "confidence": float(edge.confidence), } for edge in env.state.task.supporting_edges ], } def _last_action_error(observation: Any, info: dict[str, Any]) -> str | None: raw = info.get("last_action_error") if isinstance(info, dict) else None if raw is not None: return str(raw) tool_outputs = getattr(observation, "tool_outputs", None) if isinstance(tool_outputs, list) and tool_outputs: last = tool_outputs[-1] if isinstance(last, dict): output = last.get("output") if isinstance(output, dict) and output.get("error") is not None: return str(output.get("error")) return None def _install_step_logger(env: OSINTEnvironment) -> tuple[list[float], dict[str, int], Any]: rewards: list[float] = [] counters = {"steps": 0} original_step = env.step def _logged_step(action: Any): observation, reward, done, info = original_step(action) counters["steps"] += 1 reward_value = float(reward or 0.0) rewards.append(reward_value) action_type = getattr(action, "action_type", "") action_type_value = str(getattr(action_type, "value", action_type)) action_text = _format_action( { "action_type": action_type_value, "payload": dict(getattr(action, "payload", {}) or {}), } ) log_step( step=counters["steps"], action=action_text, reward=reward_value, done=bool(done), error=_last_action_error(observation, info if isinstance(info, dict) else {}), ) return observation, reward, done, info env.step = _logged_step return rewards, counters, original_step def _validate_required_configuration() -> None: missing: list[str] = [] api_base = API_BASE_URL.strip() model_name = MODEL_NAME.strip() hf_token = HF_TOKEN.strip() api_key = API_KEY.strip() openai_key = OPENAI_API_KEY.strip() if not api_base or api_base == "": missing.append("API_BASE_URL") if not model_name or model_name == "": missing.append("MODEL_NAME") if not (hf_token or api_key or openai_key): missing.append("HF_TOKEN|API_KEY|OPENAI_API_KEY") # Required when using docker-image based env construction. if os.getenv("REQUIRE_LOCAL_IMAGE_NAME", "0").strip().lower() in {"1", "true", "yes", "on"}: if not LOCAL_IMAGE_NAME.strip(): missing.append("LOCAL_IMAGE_NAME") if missing: raise RuntimeError(f"Missing required environment variables: {', '.join(sorted(set(missing)))}") def _task_targets(env: OSINTEnvironment, episodes: int, task_indices: list[int]) -> list[int | None]: if task_indices: task_count = max(1, len(env.tasks)) return [index % task_count for index in task_indices] return [None] * max(1, episodes) def _run_with_runner( env: OSINTEnvironment, llm: Any, episodes: int, task_indices: list[int], ) -> tuple[dict[str, Any], list[dict[str, Any]], list[float], int]: metrics = EvalMetrics() episode_rows: list[dict[str, Any]] = [] rewards, counters, original_step = _install_step_logger(env) single_runner = SingleAgentRunner(env=env, llm=llm) swarm_runner = SwarmAgentRunner(env=env, llm=llm) if env.config.swarm.enabled else None try: for task_index in _task_targets(env, episodes, task_indices): task_count = max(1, len(env.tasks)) selected_index = env._task_idx % task_count if task_index is None else int(task_index) % task_count if task_index is not None: # Keep compatibility with explicit task selection from the previous inference script. env._task_idx = selected_index difficulty = _task_difficulty(env, selected_index) if difficulty == "easy": runner: SingleAgentRunner | SwarmAgentRunner = single_runner elif swarm_runner is not None: runner = swarm_runner else: runner = single_runner info = runner.run_episode() if env.state is None: continue graph_f1 = compute_graph_f1(env.memory_graph.edges, env.state.task.supporting_edges) metrics.add(info, task_type=env.state.task.task_type, graph_f1=graph_f1) episode_rows.append(_episode_row(env, info)) finally: env.step = original_step return metrics.summary(), episode_rows, rewards, int(counters["steps"]) def _maybe_write_artifacts( env: OSINTEnvironment, summary: dict[str, Any], episodes: int, episode_rows: list[dict[str, Any]], ) -> tuple[dict[str, Any] | None, str | None]: if not WRITE_BENCHMARK_ARTIFACTS: return None, None record = append_leaderboard_record( path=LEADERBOARD_PATH, summary=summary, episodes=episodes, run_name=RUN_NAME or None, config={ "seed": env.config.seed, "max_steps": env.config.max_steps, "swarm_enabled": env.config.swarm.enabled, "max_agents": env.config.swarm.max_agents, "max_breadth": env.config.swarm.max_breadth, "max_width": env.config.swarm.max_width, "max_depth": env.config.swarm.max_depth, "seeded_questions": len(env.config.seeding.seeded_questions), "llm_provider": env.config.llm.provider, "llm_model": env.config.llm.model, }, ) leaderboard = load_leaderboard(LEADERBOARD_PATH) dashboard = export_dashboard( env=env, evaluation={"summary": summary, "episodes": episode_rows}, leaderboard_records=leaderboard, output_path=DASHBOARD_PATH, ) return record, dashboard def main() -> None: _validate_required_configuration() env_cfg = _resolve_environment_config() llm_client = build_llm_client(env_cfg.llm) episodes_given = "EPISODES" in os.environ and str(os.getenv("EPISODES", "")).strip() != "" task_indices_given = bool(TASK_INDICES) if not episodes_given and not task_indices_given: runs: list[tuple[str, list[int], int]] = [ ("easy", list(range(0, 10)), 10), ("mid", list(range(10, 20)), 10), ("hard", list(range(20, 30)), 10), ] else: selected_indices = TASK_INDICES if task_indices_given else [] episodes = len(selected_indices) if selected_indices else max(1, EPISODES) runs = [(TASK_NAME, selected_indices, episodes)] for task_name, run_indices, run_episodes in runs: env: OSINTEnvironment | None = None rewards: list[float] = [] steps_taken = 0 score = 0.0 success = False env = OSINTEnvironment(env_cfg, llm=llm_client) log_start(task=task_name, env=BENCHMARK, model=env_cfg.llm.model) try: summary, episode_rows, rewards, steps_taken = _run_with_runner( env=env, llm=llm_client, episodes=run_episodes, task_indices=run_indices, ) score = float(summary.get("avg_reward", 0.0) or 0.0) score = max(0.0, min(1.0, score)) success = score >= SUCCESS_SCORE_THRESHOLD _maybe_write_artifacts( env=env, summary=summary, episodes=run_episodes, episode_rows=episode_rows, ) finally: if env is not None: close_fn = getattr(env, "close", None) if callable(close_fn): close_fn() log_end(task=task_name, success=success, steps=steps_taken, score=score, rewards=rewards) if __name__ == "__main__": main()