OSINT / src /osint_env /baselines /openai_runner.py
siddeshwar-kagatikar
Sync current main to Hugging Face Space
fe1f842
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
from pathlib import Path
from time import perf_counter
from typing import Any
from osint_env.config import clone_environment_config, load_seeding_config, load_shared_config
from osint_env.domain.models import Action, ActionType, Edge
from osint_env.env.environment import OSINTEnvironment
from osint_env.env.reward import compute_graph_f1
from osint_env.eval.leaderboard import append_leaderboard_record, load_leaderboard
from osint_env.eval.metrics import EvalMetrics
from osint_env.platforms.tool_schemas import build_action_tools
from osint_env.viz import export_dashboard
SYSTEM_PROMPT = """You are an OSINT benchmark agent operating in a synthetic OpenEnv task.
Available actions are provided as function tools. On every turn, call exactly one tool.
Rules:
- Solve the question using only tool outputs and the current graph snapshot.
- When you have enough evidence, call submit_answer with the exact node id string.
- Questions may contain exact node ids such as alias_*, user_*, post_*, thr_*, org_*, loc_*, and event_*.
- Prefer direct id lookups when an exact id is present in the question.
- get_post and get_thread retrieve exact seeded records by id.
- Use add_edge only for relationships strongly supported by the evidence you have already collected.
- Prefer concise, high-signal tool queries.
- Never guess free-form prose when a node id answer is required.
"""
@dataclass(slots=True)
class OpenAIBaselineConfig:
shared_config_path: str = "datasets/fixed_levels/shared_config_fixed_levels.json"
seed_file: str = "datasets/fixed_levels/seed_fixed_levels.json"
output_path: str = "artifacts/baselines/openai_fixed_levels_latest.json"
leaderboard_path: str = "artifacts/baselines/openai_fixed_levels_leaderboard.json"
dashboard_path: str = "artifacts/baselines/openai_fixed_levels_dashboard.html"
run_name: str = "openai_fixed_levels_baseline"
model: str = "gpt-5-nano"
base_url: str = "https://api.openai.com/v1"
api_key: str = ""
api_key_env: str = "OPENAI_API_KEY"
temperature: float = 0.0
max_tokens: int = 256
timeout_seconds: int = 60
episodes: int = 30
max_steps: int = 8
seed: int | None = 7
append_leaderboard: bool = True
def _message_text(message: Any) -> str:
content = getattr(message, "content", "")
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
parts.append(str(item.get("text", "")))
else:
text = getattr(item, "text", None)
if text:
parts.append(str(text))
return "\n".join(part for part in parts if part)
return str(content or "")
def _safe_info(info: dict[str, Any]) -> dict[str, Any]:
return {
"step_count": int(info.get("step_count", 0)),
"total_reward": float(info.get("total_reward", 0.0)),
"tool_calls": int(info.get("tool_calls", 0)),
"redundant_tool_calls": int(info.get("redundant_tool_calls", 0)),
"reward_components": dict(info.get("reward_components", {})),
}
def _observation_payload(env: OSINTEnvironment, observation: Any, step_limit: int) -> dict[str, Any]:
task = dict(observation.task)
return {
"task": {
"task_id": task.get("task_id", ""),
"task_type": task.get("task_type", ""),
"question": task.get("question", ""),
},
"remaining_steps": max(0, step_limit - int(env.state.step_count if env.state else 0)),
"recent_tool_outputs": list(observation.tool_outputs),
"graph_snapshot": dict(observation.graph_snapshot),
"recent_action_history": list(observation.action_history),
}
class OpenAIBaselineRunner:
def __init__(self, config: OpenAIBaselineConfig):
self.config = config
from openai import OpenAI
if not config.api_key:
raise ValueError(
"OpenAI baseline requires an API key. "
f"Set {config.api_key_env} or pass --openai-api-key."
)
self.client = OpenAI(
api_key=config.api_key,
base_url=config.base_url,
timeout=config.timeout_seconds,
)
self.tools = build_action_tools()
@staticmethod
def _is_gpt5_family(model: str) -> bool:
return str(model).strip().lower().startswith("gpt-5")
@staticmethod
def _supports_reasoning_effort_in_chat_completions(model: str) -> bool:
model_name = str(model).strip().lower()
if model_name.startswith("gpt-5.4-mini"):
return False
return model_name.startswith("gpt-5")
def _request_kwargs(self, messages: list[dict[str, Any]], episode_index: int) -> dict[str, Any]:
kwargs: dict[str, Any] = {
"model": self.config.model,
"messages": messages,
"tools": self.tools,
"tool_choice": "required",
"parallel_tool_calls": False,
"max_completion_tokens": self.config.max_tokens,
}
if self.config.seed is not None:
kwargs["seed"] = int(self.config.seed) + episode_index
if self._is_gpt5_family(self.config.model):
# GPT-5 family chat-completions compatibility:
# use max_completion_tokens and avoid temperature for older GPT-5 models.
if self._supports_reasoning_effort_in_chat_completions(self.config.model):
kwargs["reasoning_effort"] = "none"
else:
kwargs["temperature"] = self.config.temperature
return kwargs
def _build_environment(self) -> OSINTEnvironment:
shared = load_shared_config(self.config.shared_config_path)
env_cfg = clone_environment_config(shared.environment)
env_cfg.seeding = load_seeding_config(self.config.seed_file)
env_cfg.llm.provider = "mock"
env_cfg.llm.model = self.config.model
env_cfg.llm.temperature = self.config.temperature
env_cfg.llm.max_tokens = self.config.max_tokens
env_cfg.max_steps = min(int(env_cfg.max_steps), int(self.config.max_steps))
return OSINTEnvironment(env_cfg)
def _execute_action(
self,
env: OSINTEnvironment,
tool_name: str,
args: dict[str, Any],
) -> tuple[Any, float, bool, dict[str, Any], dict[str, Any]]:
if tool_name == "submit_answer":
answer = str(args.get("answer", "")).strip()
obs, reward, done, info = env.step(Action(ActionType.ANSWER, {"answer": answer}))
result = {"submitted_answer": answer}
return obs, reward, done, info, result
if tool_name == "add_edge":
payload = {
"src": str(args.get("src", "")).strip(),
"rel": str(args.get("rel", "")).strip(),
"dst": str(args.get("dst", "")).strip(),
"confidence": float(args.get("confidence", 1.0)),
}
obs, reward, done, info = env.step(Action(ActionType.ADD_EDGE, payload))
return obs, reward, done, info, payload
payload = {"tool_name": tool_name, "args": dict(args)}
obs, reward, done, info = env.step(Action(ActionType.CALL_TOOL, payload))
result = obs.tool_outputs[-1]["output"] if obs.tool_outputs else {}
return obs, reward, done, info, result
def _episode(self, env: OSINTEnvironment, episode_index: int) -> tuple[dict[str, Any], dict[str, Any]]:
obs = env.reset()
initial_observation = _observation_payload(env, obs, env.config.max_steps)
messages: list[dict[str, Any]] = [
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": json.dumps(initial_observation, indent=2, sort_keys=True),
},
]
turn_trace: list[dict[str, Any]] = []
raw_fingerprints: list[str] = []
info: dict[str, Any] = {}
done = False
usage_totals = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
while not done and env.state is not None and env.state.step_count < env.config.max_steps:
completion = self.client.chat.completions.create(**self._request_kwargs(messages, episode_index))
if getattr(completion, "system_fingerprint", None):
raw_fingerprints.append(str(completion.system_fingerprint))
if getattr(completion, "usage", None) is not None:
usage_totals["prompt_tokens"] += int(getattr(completion.usage, "prompt_tokens", 0) or 0)
usage_totals["completion_tokens"] += int(getattr(completion.usage, "completion_tokens", 0) or 0)
usage_totals["total_tokens"] += int(getattr(completion.usage, "total_tokens", 0) or 0)
message = completion.choices[0].message
content = _message_text(message)
tool_calls = list(message.tool_calls or [])
if not tool_calls:
fallback_answer = content.strip() or "unknown"
obs, reward, done, info = env.step(Action(ActionType.ANSWER, {"answer": fallback_answer}))
tool_result = {
"submitted_answer": fallback_answer,
"reward": reward,
"done": done,
"observation": _observation_payload(env, obs, env.config.max_steps),
"info": _safe_info(info),
}
messages.append({"role": "assistant", "content": content})
messages.append({"role": "tool", "tool_call_id": "fallback_submit", "content": json.dumps(tool_result)})
turn_trace.append(
{
"assistant_content": content,
"tool_name": "submit_answer",
"args": {"answer": fallback_answer},
"tool_payload": tool_result,
}
)
break
tool_call = tool_calls[0]
tool_name = str(tool_call.function.name)
try:
args = json.loads(tool_call.function.arguments or "{}")
except json.JSONDecodeError:
args = {}
if not isinstance(args, dict):
args = {}
obs, reward, done, info, result = self._execute_action(env, tool_name, args)
tool_payload = {
"tool_name": tool_name,
"args": args,
"result": result,
"reward": reward,
"done": done,
"observation": _observation_payload(env, obs, env.config.max_steps),
"info": _safe_info(info),
}
assistant_message = {
"role": "assistant",
"content": content,
"tool_calls": [
{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps(args, sort_keys=True),
},
}
],
}
messages.append(assistant_message)
messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(tool_payload, sort_keys=True)})
turn_trace.append(
{
"assistant_content": content,
"tool_name": tool_name,
"args": args,
"reward": reward,
"done": done,
"tool_payload": tool_payload,
}
)
if not done:
obs, _, done, info = env.step(Action(ActionType.ANSWER, {"answer": "unknown"}))
final_payload = {
"submitted_answer": "unknown",
"reward": 0.0,
"done": done,
"observation": _observation_payload(env, obs, env.config.max_steps),
"info": _safe_info(info),
}
turn_trace.append(
{
"assistant_content": "",
"tool_name": "submit_answer",
"args": {"answer": "unknown"},
"reward": 0.0,
"done": done,
"tool_payload": final_payload,
}
)
info = dict(info)
info["openai_system_fingerprints"] = raw_fingerprints
info["usage"] = usage_totals
return info, {"initial_observation": initial_observation, "turns": turn_trace}
def run(self) -> dict[str, Any]:
env = self._build_environment()
metrics = EvalMetrics()
episode_rows: list[dict[str, Any]] = []
started = perf_counter()
run_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
for episode_index in range(int(self.config.episodes)):
info, trace = self._episode(env, episode_index)
episode_usage = dict(info.get("usage", {}))
for key in run_usage:
run_usage[key] += int(episode_usage.get(key, 0) or 0)
task_type = env.state.task.task_type if env.state else "unknown"
task_id = env.state.task.task_id if env.state else f"episode_{episode_index}"
truth = env.state.task.supporting_edges if env.state else []
pred = env.memory_graph.edges if env.state else []
graph_f1 = compute_graph_f1(pred, truth)
metrics.add(info, task_type=task_type, graph_f1=graph_f1)
episode_rows.append(
{
"task_id": task_id,
"task_type": task_type,
"question": env.state.task.question if env.state else "",
"task_answer": str(info.get("task_answer", "")),
"agent_answer": str(info.get("agent_answer", "")) if info.get("agent_answer") is not None else "",
"graph_f1": graph_f1,
"reward": float(info.get("total_reward", 0.0)),
"steps": int(info.get("step_count", 0)),
"tool_calls": int(info.get("tool_calls", 0)),
"success": int(info.get("agent_answer") == info.get("task_answer")),
"reward_components": dict(info.get("reward_components", {})),
"pred_edges": [
{
"src": edge.src,
"rel": edge.rel,
"dst": edge.dst,
"confidence": float(edge.confidence),
}
for edge in pred
],
"truth_edges": [
{
"src": edge.src,
"rel": edge.rel,
"dst": edge.dst,
"confidence": float(edge.confidence),
}
for edge in truth
],
"trace": trace,
"openai_system_fingerprints": list(info.get("openai_system_fingerprints", [])),
"usage": episode_usage,
}
)
summary = metrics.summary()
duration_seconds = perf_counter() - started
if self.config.append_leaderboard:
record = append_leaderboard_record(
path=self.config.leaderboard_path,
summary=summary,
episodes=int(self.config.episodes),
run_name=self.config.run_name,
config={
"provider": "openai",
"model": self.config.model,
"seed": self.config.seed,
"max_steps": self.config.max_steps,
"shared_config_path": self.config.shared_config_path,
"seed_file": self.config.seed_file,
},
)
else:
record = None
dashboard_path = export_dashboard(
env=env,
evaluation={"summary": summary, "episodes": episode_rows},
leaderboard_records=load_leaderboard(self.config.leaderboard_path),
output_path=self.config.dashboard_path,
)
payload: dict[str, Any] = {
"run": {
"name": self.config.run_name,
"model": self.config.model,
"episodes": int(self.config.episodes),
"temperature": float(self.config.temperature),
"max_tokens": int(self.config.max_tokens),
"timeout_seconds": int(self.config.timeout_seconds),
"max_steps": int(self.config.max_steps),
"seed": self.config.seed,
"shared_config_path": self.config.shared_config_path,
"seed_file": self.config.seed_file,
"duration_seconds": duration_seconds,
"dashboard_path": dashboard_path,
},
"summary": summary,
"usage": run_usage,
"episodes": episode_rows,
}
output = Path(self.config.output_path)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
if record is not None:
payload["record"] = record
output.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
return payload