| """ |
| Inference Script for AdAudit |
| =================================== |
| MANDATORY |
| - Before submitting, ensure the following variables are defined in your environment configuration: |
| API_BASE_URL The API endpoint for the LLM. |
| MODEL_NAME The model identifier to use for inference. |
| HF_TOKEN Your Hugging Face / API key. |
| LOCAL_IMAGE_NAME The name of the local image to use for the environment if you are using from_docker_image() |
| method |
| |
| - Defaults are set only for API_BASE_URL and MODEL_NAME |
| (and should reflect your active inference setup): |
| API_BASE_URL = os.getenv("API_BASE_URL", "<your-active-endpoint>") |
| MODEL_NAME = os.getenv("MODEL_NAME", "<your-active-model>") |
| |
| - The inference script must be named `inference.py` and placed in the root directory of the project |
| - Participants must use OpenAI Client for all LLM calls using above variables |
| |
| STDOUT FORMAT |
| - The script must emit exactly three line types to stdout, in this order: |
| |
| [START] task=<task_name> env=<benchmark> model=<model_name> |
| [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null> |
| [END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn> |
| |
| Rules: |
| - One [START] line at episode begin. |
| - One [STEP] line per step, immediately after env.step() returns. |
| - One [END] line after env.close(), always emitted (even on exception). |
| - reward and rewards are formatted to 2 decimal places. |
| - done and success are lowercase booleans: true or false. |
| - error is the raw last_action_error string, or null if none. |
| - All fields on a single line with no newlines within a line. |
| - Each tasks should return score in [0, 1] |
| |
| Example: |
| [START] task=click-test env=miniwob model=Qwen3-VL-30B |
| [STEP] step=1 action=click('123') reward=0.00 done=false error=null |
| [STEP] step=2 action=fill('456','text') reward=0.00 done=false error=null |
| [STEP] step=3 action=click('789') reward=1.00 done=true error=null |
| [END] success=true steps=3 score=1.00 rewards=0.00,0.00,1.00 |
| """ |
|
|
| import json |
| import os |
| import re |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| from openai import OpenAI |
|
|
| from server.Ad_Audit_environment import AdAuditEnv, EPISODE_DAYS |
| from models import AdAuditAction |
|
|
| |
| |
| |
|
|
| IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") |
|
|
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-7B-Instruct") |
|
|
| _VALID_TASKS = {"easy", "medium", "hard"} |
| _task_env = os.getenv("ADAUDIT_TASK", "").strip().lower() |
| TASK_NAME = _task_env if _task_env in _VALID_TASKS else "medium" |
| BENCHMARK = os.getenv("ADAUDIT_BENCHMARK", "adaudit") |
| TEMPERATURE = 0.0 |
| MAX_TOKENS = 2048 |
| HISTORY_WINDOW = 5 |
| BASELINE_DAYS = 3 |
| SUCCESS_SCORE_THRESHOLD = 0.4 |
|
|
| |
| TOOLS_FOR = { |
| "bot_traffic": ["click_timestamps", "ip_distribution"], |
| "click_injection": ["conversion_quality", "click_timestamps"], |
| "domain_spoofing": ["referral_urls", "viewability_scores"], |
| } |
|
|
| SYSTEM_PROMPT = """\ |
| You are an ad fraud analyst monitoring a 14-day campaign with multiple publishers. |
| Each day you see metrics (impressions, clicks, conversions, CTR, CVR) and take ONE action. |
| |
| ## ACTIONS |
| |
| Use publisher IDs (pub_001, pub_002, etc.), never publisher names. |
| |
| 1. Monitor — observe metrics, take no action: |
| {"action_type": "monitor"} |
| 2. Investigate — run one tool on one publisher (costs 1 budget): |
| {"action_type": "investigate_publisher", "publisher_id": "pub_001", "tool": "click_timestamps"} |
| 3. Flag fraud — flag a publisher as fraudulent (irreversible): |
| {"action_type": "flag_fraud", "publisher_id": "pub_001", "fraud_type": "bot_traffic", "evidence": ["click_timestamps", "ip_distribution"]} |
| 4. Submit report — end the episode: |
| {"action_type": "submit_report", "summary": "..."} |
| |
| Valid tools: click_timestamps, ip_distribution, device_fingerprints, \ |
| referral_urls, viewability_scores, conversion_quality |
| Valid fraud types: bot_traffic, domain_spoofing, click_injection |
| |
| ## STRATEGY |
| 1. Monitor days 1-3 to establish baselines. Then compare each publisher's CTR/CVR against its own baseline. Only publishers with BIG shifts (CTR doubled, CVR crashed, impressions tripled) are suspicious. Stable metrics = clean. |
| 2. Investigate suspicious publishers with 2 tools before flagging. bot_traffic = CTR spikes + CVR near zero. click_injection = CVR abnormally high (5%+). domain_spoofing = impressions surge + CVR drops. |
| 3. False positives are heavily penalized. When in doubt, monitor. |
| 4. Do not submit report until you are confident you have found and flagged ALL fraudsters. |
| |
| Respond with ONLY a JSON action in ```json``` markers. |
| """ |
|
|
|
|
| |
| |
| |
|
|
| def log_start(task: str, env: str, model: str) -> None: |
| print(f"[START] task={task} env={env} model={model}", flush=True) |
|
|
|
|
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: |
| error_val = error if error else "null" |
| done_val = str(done).lower() |
| print(f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True) |
|
|
|
|
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", flush=True) |
|
|
|
|
| |
| |
| |
|
|
| def format_observation(obs_dict: Dict[str, Any], action_log: List[str] = None) -> str: |
| parts: List[str] = [] |
| parts.append(f"=== DAY {obs_dict['day']} of {obs_dict.get('campaign_day_total', 14)} ===\n") |
|
|
| if action_log: |
| parts.append("Past actions: " + " | ".join(action_log)) |
| parts.append("") |
|
|
| if obs_dict.get("daily_metrics"): |
| parts.append("Metrics:") |
| parts.append(f"{'ID':<10} {'Publisher':<22} {'Impressions':>12} {'Clicks':>8} {'Conversions':>12} {'Spend ($)':>10} {'CTR':>7} {'CVR':>7}") |
| for m in obs_dict["daily_metrics"]: |
| parts.append( |
| f"{m['publisher_id']:<10} {m['name']:<22} {m['impressions']:>12,} {m['clicks']:>8,} " |
| f"{m['conversions']:>12,} {m['spend']:>10,.2f} {m['ctr']:>6.2%} {m['cvr']:>6.2%}" |
| ) |
| parts.append("") |
|
|
| if obs_dict.get("investigation_results"): |
| inv = obs_dict["investigation_results"] |
| if isinstance(inv, dict): |
| if "error" in inv: |
| parts.append(f"Investigation ERROR: {inv['error']}") |
| else: |
| parts.append(f"Investigation ({inv.get('tool', '?')}) for {inv.get('publisher_id', '?')}:") |
| for k, v in inv.items(): |
| if k not in ("tool", "publisher_id"): |
| parts.append(f" {k}: {v}") |
| parts.append("") |
|
|
| pub_status = obs_dict.get("publisher_status", {}) |
| if pub_status: |
| flagged_pubs = [pid for pid, s in pub_status.items() if s == "flagged"] |
| active = [pid for pid, s in pub_status.items() if s == "active"] |
| if flagged_pubs: |
| parts.append(f"FLAGGED: {', '.join(flagged_pubs)}") |
| parts.append(f"Active publishers: {', '.join(active)}") |
|
|
| b = obs_dict.get("budget_status", {}) |
| if b: |
| parts.append( |
| f"Budget: ${b.get('remaining',0):,.0f} remaining | " |
| f"Investigations left: {b.get('investigation_budget_remaining', 0)}" |
| ) |
| return "\n".join(parts) |
|
|
|
|
| |
| |
| |
|
|
| def parse_action(text: str) -> Dict[str, Any]: |
| match = re.search(r"```json\s*(\{.*?\})\s*```", text, re.DOTALL) |
| if match: |
| try: |
| return json.loads(match.group(1)) |
| except json.JSONDecodeError: |
| pass |
| for line in reversed(text.strip().split("\n")): |
| line = line.strip() |
| if line.startswith("{") and line.endswith("}"): |
| try: |
| return json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| return {"action_type": "monitor"} |
|
|
|
|
| |
| |
| |
|
|
| def get_llm_action( |
| client: OpenAI, |
| obs_text: str, |
| history: List[Dict[str, str]], |
| ) -> Optional[Dict[str, Any]]: |
| """Try to get an action from the LLM. Returns None on failure.""" |
| messages = [{"role": "system", "content": SYSTEM_PROMPT}] |
| messages.extend(history) |
| messages.append({"role": "user", "content": obs_text}) |
|
|
| try: |
| response = client.chat.completions.create( |
| model=MODEL_NAME, |
| messages=messages, |
| temperature=TEMPERATURE, |
| max_tokens=MAX_TOKENS, |
| ) |
| text = response.choices[0].message.content or "" |
| return parse_action(text) |
| except Exception: |
| return None |
|
|
|
|
| |
| |
| |
|
|
| def score_publishers(metrics_history: Dict[str, List[Dict]]) -> List[Tuple[str, float, str]]: |
| """Score each publisher's suspicion based on accumulated metrics.""" |
| results = [] |
| for pid, hist in metrics_history.items(): |
| if len(hist) < 2: |
| continue |
| recent = hist[-3:] |
| avg_ctr = sum(h["ctr"] for h in recent) / len(recent) |
| avg_cvr = sum(h["cvr"] for h in recent) / len(recent) |
| baseline = hist[:2] |
| base_ctr = sum(h["ctr"] for h in baseline) / len(baseline) |
| base_cvr = sum(h["cvr"] for h in baseline) / len(baseline) |
|
|
| score = 0.0 |
| fraud_type = "bot_traffic" |
|
|
| if base_ctr > 0: |
| ctr_rise = avg_ctr / base_ctr |
| if ctr_rise > 1.5: |
| score += 0.4 |
| if ctr_rise > 2.0: |
| score += 0.3 |
| if base_cvr > 0: |
| cvr_drop = avg_cvr / base_cvr |
| if cvr_drop < 0.5: |
| score += 0.3 |
| if cvr_drop < 0.2: |
| score += 0.3 |
| if avg_cvr > 0.04: |
| score += 0.5 |
| fraud_type = "click_injection" |
| if base_ctr > 0 and base_cvr > 0: |
| ctr_rise = avg_ctr / base_ctr |
| cvr_drop = avg_cvr / base_cvr |
| if 1.2 < ctr_rise < 2.0 and 0.2 < cvr_drop < 0.6: |
| if fraud_type == "bot_traffic" and score < 0.5: |
| fraud_type = "domain_spoofing" |
|
|
| if score > 0.2: |
| results.append((pid, score, fraud_type)) |
|
|
| results.sort(key=lambda x: -x[1]) |
| return results |
|
|
|
|
| def get_rule_action( |
| obs_dict: Dict[str, Any], |
| metrics_history: Dict[str, List[Dict]], |
| investigated: Dict[str, List[str]], |
| flagged: set, |
| ) -> Dict[str, Any]: |
| """Deterministic rule-based action selection.""" |
| day = obs_dict["day"] |
| budget_left = obs_dict.get("budget_status", {}).get("investigation_budget_remaining", 0) |
|
|
| if day <= BASELINE_DAYS: |
| return {"action_type": "monitor"} |
|
|
| suspects = score_publishers(metrics_history) |
| suspects = [(pid, sc, ft) for pid, sc, ft in suspects if pid not in flagged] |
|
|
| for pid, _, ft in suspects: |
| tools_done = investigated.get(pid, []) |
| if len(tools_done) >= 2: |
| flagged.add(pid) |
| return { |
| "action_type": "flag_fraud", |
| "publisher_id": pid, |
| "fraud_type": ft, |
| "evidence": tools_done, |
| } |
|
|
| if budget_left > 0 and suspects: |
| for pid, _, ft in suspects: |
| if pid in flagged: |
| continue |
| tools_done = investigated.get(pid, []) |
| tools_to_try = TOOLS_FOR.get(ft, TOOLS_FOR["bot_traffic"]) |
| for tool in tools_to_try: |
| if tool not in tools_done: |
| investigated.setdefault(pid, []).append(tool) |
| return { |
| "action_type": "investigate_publisher", |
| "publisher_id": pid, |
| "tool": tool, |
| } |
|
|
| return {"action_type": "monitor"} |
|
|
|
|
| |
| |
| |
|
|
| def run_episode(task_name: str, llm_client: Optional[OpenAI]) -> None: |
| use_rules = llm_client is None |
|
|
| env = AdAuditEnv() |
|
|
| rewards: List[float] = [] |
| steps_taken = 0 |
| score = 0.0 |
| success = False |
|
|
| |
| history: List[Dict[str, str]] = [] |
| action_log: List[str] = [] |
|
|
| |
| metrics_history: Dict[str, List[Dict]] = {} |
| investigated: Dict[str, List[str]] = {} |
| flagged: set = set() |
|
|
| log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME if not use_rules else "rule-based") |
|
|
| try: |
| obs = env.reset(episode_id=task_name) |
| obs_dict = obs.model_dump() |
|
|
| while not obs_dict.get("done", False) and steps_taken < EPISODE_DAYS: |
| |
| for m in obs_dict.get("daily_metrics", []): |
| metrics_history.setdefault(m["publisher_id"], []).append(m) |
|
|
| action = None |
|
|
| if not use_rules: |
| obs_text = format_observation(obs_dict, action_log) |
| action = get_llm_action(llm_client, obs_text, history[-HISTORY_WINDOW * 2:]) |
| if action is None: |
| use_rules = True |
|
|
| if action is None: |
| action = get_rule_action(obs_dict, metrics_history, investigated, flagged) |
|
|
| |
| try: |
| action_obj = AdAuditAction(**action) |
| except Exception: |
| action_obj = AdAuditAction(action_type="invalid") |
|
|
| |
| log_entry = f"D{obs_dict['day']}:{action_obj.action_type}" |
| if action_obj.publisher_id: |
| log_entry += f"({action_obj.publisher_id}" |
| if action_obj.tool: |
| log_entry += f",{action_obj.tool}" |
| if action_obj.fraud_type: |
| log_entry += f",{action_obj.fraud_type}" |
| log_entry += ")" |
| action_log.append(log_entry) |
|
|
| |
| if not use_rules: |
| history.append({"role": "user", "content": obs_text}) |
| history.append({"role": "assistant", "content": json.dumps(action)}) |
|
|
| |
| obs = env.step(action_obj) |
| obs_dict = obs.model_dump() |
| steps_taken += 1 |
|
|
| reward = obs_dict.get("reward", 0.0) |
| done = obs_dict.get("done", False) |
| error = None |
| rewards.append(reward) |
|
|
| action_str = json.dumps(action, separators=(",", ":")) |
| log_step(step=steps_taken, action=action_str, reward=reward, done=done, error=error) |
|
|
| |
| state = env.state |
| grader = state.grader_inputs |
| score = grader.get("final_score", 0.0) |
| score = min(max(score, 0.0), 1.0) |
| success = score >= SUCCESS_SCORE_THRESHOLD |
|
|
| finally: |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) |
|
|
|
|
| def main() -> None: |
| |
| llm_client: Optional[OpenAI] = None |
| try: |
| llm_client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) |
| llm_client.models.list() |
| except Exception: |
| llm_client = None |
|
|
| for task in sorted(_VALID_TASKS): |
| run_episode(task, llm_client) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|