""" Inference Script for AdAudit =================================== MANDATORY - Before submitting, ensure the following variables are defined in your environment configuration: API_BASE_URL The API endpoint for the LLM. MODEL_NAME The model identifier to use for inference. HF_TOKEN Your Hugging Face / API key. LOCAL_IMAGE_NAME The name of the local image to use for the environment if you are using from_docker_image() method - Defaults are set only for API_BASE_URL and MODEL_NAME (and should reflect your active inference setup): API_BASE_URL = os.getenv("API_BASE_URL", "") MODEL_NAME = os.getenv("MODEL_NAME", "") - The inference script must be named `inference.py` and placed in the root directory of the project - Participants must use OpenAI Client for all LLM calls using above variables STDOUT FORMAT - The script must emit exactly three line types to stdout, in this order: [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score= rewards= Rules: - One [START] line at episode begin. - One [STEP] line per step, immediately after env.step() returns. - One [END] line after env.close(), always emitted (even on exception). - reward and rewards are formatted to 2 decimal places. - done and success are lowercase booleans: true or false. - error is the raw last_action_error string, or null if none. - All fields on a single line with no newlines within a line. - Each tasks should return score in [0, 1] Example: [START] task=click-test env=miniwob model=Qwen3-VL-30B [STEP] step=1 action=click('123') reward=0.00 done=false error=null [STEP] step=2 action=fill('456','text') reward=0.00 done=false error=null [STEP] step=3 action=click('789') reward=1.00 done=true error=null [END] success=true steps=3 score=1.00 rewards=0.00,0.00,1.00 """ import json import os import re from typing import Any, Dict, List, Optional, Tuple from openai import OpenAI from server.Ad_Audit_environment import AdAuditEnv, EPISODE_DAYS from models import AdAuditAction # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-7B-Instruct") _VALID_TASKS = {"easy", "medium", "hard"} _task_env = os.getenv("ADAUDIT_TASK", "").strip().lower() TASK_NAME = _task_env if _task_env in _VALID_TASKS else "medium" BENCHMARK = os.getenv("ADAUDIT_BENCHMARK", "adaudit") TEMPERATURE = 0.0 MAX_TOKENS = 2048 HISTORY_WINDOW = 5 BASELINE_DAYS = 3 SUCCESS_SCORE_THRESHOLD = 0.4 # Rule-based investigation tools per fraud type TOOLS_FOR = { "bot_traffic": ["click_timestamps", "ip_distribution"], "click_injection": ["conversion_quality", "click_timestamps"], "domain_spoofing": ["referral_urls", "viewability_scores"], } SYSTEM_PROMPT = """\ You are an ad fraud analyst monitoring a 14-day campaign with multiple publishers. Each day you see metrics (impressions, clicks, conversions, CTR, CVR) and take ONE action. ## ACTIONS Use publisher IDs (pub_001, pub_002, etc.), never publisher names. 1. Monitor — observe metrics, take no action: {"action_type": "monitor"} 2. Investigate — run one tool on one publisher (costs 1 budget): {"action_type": "investigate_publisher", "publisher_id": "pub_001", "tool": "click_timestamps"} 3. Flag fraud — flag a publisher as fraudulent (irreversible): {"action_type": "flag_fraud", "publisher_id": "pub_001", "fraud_type": "bot_traffic", "evidence": ["click_timestamps", "ip_distribution"]} 4. Submit report — end the episode: {"action_type": "submit_report", "summary": "..."} Valid tools: click_timestamps, ip_distribution, device_fingerprints, \ referral_urls, viewability_scores, conversion_quality Valid fraud types: bot_traffic, domain_spoofing, click_injection ## STRATEGY 1. Monitor days 1-3 to establish baselines. Then compare each publisher's CTR/CVR against its own baseline. Only publishers with BIG shifts (CTR doubled, CVR crashed, impressions tripled) are suspicious. Stable metrics = clean. 2. Investigate suspicious publishers with 2 tools before flagging. bot_traffic = CTR spikes + CVR near zero. click_injection = CVR abnormally high (5%+). domain_spoofing = impressions surge + CVR drops. 3. False positives are heavily penalized. When in doubt, monitor. 4. Do not submit report until you are confident you have found and flagged ALL fraudsters. Respond with ONLY a JSON action in ```json``` markers. """ # --------------------------------------------------------------------------- # Logging helpers # --------------------------------------------------------------------------- def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = str(done).lower() print(f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", flush=True) # --------------------------------------------------------------------------- # Observation formatting (for LLM) # --------------------------------------------------------------------------- def format_observation(obs_dict: Dict[str, Any], action_log: List[str] = None) -> str: parts: List[str] = [] parts.append(f"=== DAY {obs_dict['day']} of {obs_dict.get('campaign_day_total', 14)} ===\n") if action_log: parts.append("Past actions: " + " | ".join(action_log)) parts.append("") if obs_dict.get("daily_metrics"): parts.append("Metrics:") parts.append(f"{'ID':<10} {'Publisher':<22} {'Impressions':>12} {'Clicks':>8} {'Conversions':>12} {'Spend ($)':>10} {'CTR':>7} {'CVR':>7}") for m in obs_dict["daily_metrics"]: parts.append( f"{m['publisher_id']:<10} {m['name']:<22} {m['impressions']:>12,} {m['clicks']:>8,} " f"{m['conversions']:>12,} {m['spend']:>10,.2f} {m['ctr']:>6.2%} {m['cvr']:>6.2%}" ) parts.append("") if obs_dict.get("investigation_results"): inv = obs_dict["investigation_results"] if isinstance(inv, dict): if "error" in inv: parts.append(f"Investigation ERROR: {inv['error']}") else: parts.append(f"Investigation ({inv.get('tool', '?')}) for {inv.get('publisher_id', '?')}:") for k, v in inv.items(): if k not in ("tool", "publisher_id"): parts.append(f" {k}: {v}") parts.append("") pub_status = obs_dict.get("publisher_status", {}) if pub_status: flagged_pubs = [pid for pid, s in pub_status.items() if s == "flagged"] active = [pid for pid, s in pub_status.items() if s == "active"] if flagged_pubs: parts.append(f"FLAGGED: {', '.join(flagged_pubs)}") parts.append(f"Active publishers: {', '.join(active)}") b = obs_dict.get("budget_status", {}) if b: parts.append( f"Budget: ${b.get('remaining',0):,.0f} remaining | " f"Investigations left: {b.get('investigation_budget_remaining', 0)}" ) return "\n".join(parts) # --------------------------------------------------------------------------- # Action parsing (for LLM) # --------------------------------------------------------------------------- def parse_action(text: str) -> Dict[str, Any]: match = re.search(r"```json\s*(\{.*?\})\s*```", text, re.DOTALL) if match: try: return json.loads(match.group(1)) except json.JSONDecodeError: pass for line in reversed(text.strip().split("\n")): line = line.strip() if line.startswith("{") and line.endswith("}"): try: return json.loads(line) except json.JSONDecodeError: continue return {"action_type": "monitor"} # --------------------------------------------------------------------------- # LLM action # --------------------------------------------------------------------------- def get_llm_action( client: OpenAI, obs_text: str, history: List[Dict[str, str]], ) -> Optional[Dict[str, Any]]: """Try to get an action from the LLM. Returns None on failure.""" messages = [{"role": "system", "content": SYSTEM_PROMPT}] messages.extend(history) messages.append({"role": "user", "content": obs_text}) try: response = client.chat.completions.create( model=MODEL_NAME, messages=messages, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, ) text = response.choices[0].message.content or "" return parse_action(text) except Exception: return None # --------------------------------------------------------------------------- # Rule-based fallback # --------------------------------------------------------------------------- def score_publishers(metrics_history: Dict[str, List[Dict]]) -> List[Tuple[str, float, str]]: """Score each publisher's suspicion based on accumulated metrics.""" results = [] for pid, hist in metrics_history.items(): if len(hist) < 2: continue recent = hist[-3:] avg_ctr = sum(h["ctr"] for h in recent) / len(recent) avg_cvr = sum(h["cvr"] for h in recent) / len(recent) baseline = hist[:2] base_ctr = sum(h["ctr"] for h in baseline) / len(baseline) base_cvr = sum(h["cvr"] for h in baseline) / len(baseline) score = 0.0 fraud_type = "bot_traffic" if base_ctr > 0: ctr_rise = avg_ctr / base_ctr if ctr_rise > 1.5: score += 0.4 if ctr_rise > 2.0: score += 0.3 if base_cvr > 0: cvr_drop = avg_cvr / base_cvr if cvr_drop < 0.5: score += 0.3 if cvr_drop < 0.2: score += 0.3 if avg_cvr > 0.04: score += 0.5 fraud_type = "click_injection" if base_ctr > 0 and base_cvr > 0: ctr_rise = avg_ctr / base_ctr cvr_drop = avg_cvr / base_cvr if 1.2 < ctr_rise < 2.0 and 0.2 < cvr_drop < 0.6: if fraud_type == "bot_traffic" and score < 0.5: fraud_type = "domain_spoofing" if score > 0.2: results.append((pid, score, fraud_type)) results.sort(key=lambda x: -x[1]) return results def get_rule_action( obs_dict: Dict[str, Any], metrics_history: Dict[str, List[Dict]], investigated: Dict[str, List[str]], flagged: set, ) -> Dict[str, Any]: """Deterministic rule-based action selection.""" day = obs_dict["day"] budget_left = obs_dict.get("budget_status", {}).get("investigation_budget_remaining", 0) if day <= BASELINE_DAYS: return {"action_type": "monitor"} suspects = score_publishers(metrics_history) suspects = [(pid, sc, ft) for pid, sc, ft in suspects if pid not in flagged] for pid, _, ft in suspects: tools_done = investigated.get(pid, []) if len(tools_done) >= 2: flagged.add(pid) return { "action_type": "flag_fraud", "publisher_id": pid, "fraud_type": ft, "evidence": tools_done, } if budget_left > 0 and suspects: for pid, _, ft in suspects: if pid in flagged: continue tools_done = investigated.get(pid, []) tools_to_try = TOOLS_FOR.get(ft, TOOLS_FOR["bot_traffic"]) for tool in tools_to_try: if tool not in tools_done: investigated.setdefault(pid, []).append(tool) return { "action_type": "investigate_publisher", "publisher_id": pid, "tool": tool, } return {"action_type": "monitor"} # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def run_episode(task_name: str, llm_client: Optional[OpenAI]) -> None: use_rules = llm_client is None env = AdAuditEnv() rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False # LLM state history: List[Dict[str, str]] = [] action_log: List[str] = [] # Rule-based state metrics_history: Dict[str, List[Dict]] = {} investigated: Dict[str, List[str]] = {} flagged: set = set() log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME if not use_rules else "rule-based") try: obs = env.reset(episode_id=task_name) obs_dict = obs.model_dump() while not obs_dict.get("done", False) and steps_taken < EPISODE_DAYS: # Track metrics for rule-based fallback for m in obs_dict.get("daily_metrics", []): metrics_history.setdefault(m["publisher_id"], []).append(m) action = None if not use_rules: obs_text = format_observation(obs_dict, action_log) action = get_llm_action(llm_client, obs_text, history[-HISTORY_WINDOW * 2:]) if action is None: use_rules = True if action is None: action = get_rule_action(obs_dict, metrics_history, investigated, flagged) # Validate action try: action_obj = AdAuditAction(**action) except Exception: action_obj = AdAuditAction(action_type="invalid") # Build action log entry log_entry = f"D{obs_dict['day']}:{action_obj.action_type}" if action_obj.publisher_id: log_entry += f"({action_obj.publisher_id}" if action_obj.tool: log_entry += f",{action_obj.tool}" if action_obj.fraud_type: log_entry += f",{action_obj.fraud_type}" log_entry += ")" action_log.append(log_entry) # Update LLM history if not use_rules: history.append({"role": "user", "content": obs_text}) history.append({"role": "assistant", "content": json.dumps(action)}) # Step environment obs = env.step(action_obj) obs_dict = obs.model_dump() steps_taken += 1 reward = obs_dict.get("reward", 0.0) done = obs_dict.get("done", False) error = None rewards.append(reward) action_str = json.dumps(action, separators=(",", ":")) log_step(step=steps_taken, action=action_str, reward=reward, done=done, error=error) # Final grading state = env.state grader = state.grader_inputs score = grader.get("final_score", 0.0) score = min(max(score, 0.0), 1.0) success = score >= SUCCESS_SCORE_THRESHOLD finally: log_end(success=success, steps=steps_taken, score=score, rewards=rewards) def main() -> None: # Try to init LLM client; fall back to rule-based if it fails llm_client: Optional[OpenAI] = None try: llm_client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) llm_client.models.list() except Exception: llm_client = None for task in sorted(_VALID_TASKS): run_episode(task, llm_client) if __name__ == "__main__": main()