Spaces:
Sleeping
Sleeping
| """ | |
| Baseline runner for the Pharmacovigilance Signal Detector submission. | |
| This script queries a chat model through the OpenAI client, sends its decision | |
| to the environment server, and prints the exact machine-readable lines expected | |
| by the evaluator. | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| from typing import Any, Iterable, List | |
| import requests | |
| from pydantic import ValidationError | |
| try: | |
| from .graders import TASK_TO_GRADER | |
| from .models import PharmaAction | |
| except ImportError: | |
| from graders import TASK_TO_GRADER | |
| from models import PharmaAction | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("API_KEY") | |
| ENV_URL = os.getenv("ENV_URL", "http://localhost:7860").rstrip("/") | |
| TASK_OVERRIDE = os.getenv("TASK_NAME", "").strip() | |
| BENCHMARK = "pharma-vigilance" | |
| TASK_SETS = { | |
| "easy": ("known_signal_easy",), | |
| "medium": ("cluster_signal_medium",), | |
| "hard": ("confounded_hard",), | |
| "all": ("known_signal_easy", "cluster_signal_medium", "confounded_hard"), | |
| } | |
| SYSTEM_MESSAGE = """ | |
| You are acting as a pharmacovigilance triage analyst. | |
| Read the synthetic case bundle and reply with exactly one JSON object. | |
| Allowed keys: | |
| - classification | |
| - suspect_drug | |
| - severity_assessment | |
| - recommended_action | |
| - reasoning | |
| - confidence | |
| Allowed values: | |
| - classification: new_signal, known_side_effect, noise, duplicate | |
| - severity_assessment: mild, moderate, severe, critical | |
| - recommended_action: escalate, log_and_monitor, dismiss, request_more_info | |
| - confidence: integer from 0 to 100 | |
| No markdown. No explanation outside the JSON object. | |
| """.strip() | |
| def emit_start(task_name: str) -> None: | |
| print(f"[START] task={task_name} env={BENCHMARK} model={MODEL_NAME}", flush=True) | |
| def emit_step(step_no: int, action_text: str, reward: float, done: bool, error: str | None) -> None: | |
| error_text = error if error else "null" | |
| print( | |
| f"[STEP] step={step_no} action={action_text} reward={reward:.2f} " | |
| f"done={str(done).lower()} error={error_text}", | |
| flush=True, | |
| ) | |
| def emit_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| reward_text = ",".join(f"{reward:.2f}" for reward in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} " | |
| f"score={score:.6f} rewards={reward_text}", | |
| flush=True, | |
| ) | |
| def choose_tasks(selection: str) -> Iterable[str]: | |
| if TASK_OVERRIDE: | |
| return (TASK_OVERRIDE,) | |
| return TASK_SETS[selection] | |
| def client() -> Any: | |
| if not HF_TOKEN: | |
| raise EnvironmentError("HF_TOKEN or API_KEY must be set before running inference.py") | |
| from openai import OpenAI | |
| return OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) | |
| def fetch_reset(task_name: str) -> dict: | |
| response = requests.post( | |
| f"{ENV_URL}/reset", | |
| json={"task_id": task_name}, | |
| timeout=30, | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| def submit_action(action: PharmaAction) -> dict: | |
| response = requests.post( | |
| f"{ENV_URL}/step", | |
| json={"action": action.model_dump()}, | |
| timeout=30, | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| def prompt_for_case(observation: dict) -> str: | |
| return ( | |
| "Assess the following pharmacovigilance case.\n\n" | |
| "Return one final structured judgment.\n\n" | |
| f"{json.dumps(observation, ensure_ascii=True, indent=2)}\n\n" | |
| "Focus on whether the case is novel or known, the most plausible causal " | |
| "drug or interaction, the right severity band, and the operational next step." | |
| ) | |
| def ask_model(llm: Any, observation: dict) -> PharmaAction: | |
| completion = llm.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_MESSAGE}, | |
| {"role": "user", "content": prompt_for_case(observation)}, | |
| ], | |
| temperature=0.0, | |
| max_tokens=260, | |
| stream=False, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| payload = json.loads(text) | |
| return PharmaAction(**payload) | |
| def compact_action(action: PharmaAction) -> str: | |
| label = action.classification | |
| if action.suspect_drug: | |
| return f"{label}/{action.suspect_drug}" | |
| return label | |
| def final_score(task_name: str, rewards: List[float]) -> float: | |
| grader = TASK_TO_GRADER.get(task_name) | |
| if grader is None: | |
| score = sum(rewards) / len(rewards) if rewards else 0.0 | |
| return min(max(round(score, 4), 0.01), 0.99) | |
| return float(grader({"rewards": rewards})) | |
| def run_one_task(llm: Any, task_name: str) -> None: | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| emit_start(task_name) | |
| try: | |
| result = fetch_reset(task_name) | |
| done = bool(result.get("done", False)) | |
| while not done: | |
| observation = result | |
| action = ask_model(llm, observation) | |
| action_text = compact_action(action) | |
| result = submit_action(action) | |
| reward_payload = result.get("reward", {}) | |
| reward = ( | |
| float(reward_payload.get("total", 0.0)) | |
| if isinstance(reward_payload, dict) | |
| else float(reward_payload) | |
| ) | |
| done = bool(result.get("done", False)) | |
| rewards.append(reward) | |
| steps_taken += 1 | |
| emit_step(steps_taken, action_text, reward, done, None) | |
| score = final_score(task_name, rewards) | |
| success = score >= 0.60 | |
| except json.JSONDecodeError: | |
| rewards = [0.0] | |
| steps_taken = 1 | |
| emit_step(1, "parse_error", 0.0, True, "parse_error") | |
| except ValidationError: | |
| rewards = [0.0] | |
| steps_taken = 1 | |
| emit_step(1, "schema_error", 0.0, True, "schema_error") | |
| except Exception as exc: | |
| rewards = [0.0] | |
| steps_taken = 1 | |
| emit_step(1, "error", 0.0, True, str(exc)) | |
| finally: | |
| emit_end(success, steps_taken, score, rewards or [0.0]) | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Run the pharmacovigilance baseline agent") | |
| parser.add_argument( | |
| "--difficulty", | |
| choices=["easy", "medium", "hard", "all"], | |
| default="all", | |
| help="Which task subset to run", | |
| ) | |
| args = parser.parse_args() | |
| llm = client() | |
| for task_name in choose_tasks(args.difficulty): | |
| run_one_task(llm, task_name) | |
| if __name__ == "__main__": | |
| main() | |