gdpr-auditor / inference.py
Charan Sai Mamidala
fix: use 0.001/0.999 bounds β€” 1e-6 formats as 0.0000 in :.4f stdout which validator reads as 0.0
dd054aa
#!/usr/bin/env python3
"""
Baseline Inference Script for GDPR Auditor Environment.
Uses OpenAI Client + HTTP calls to the server to run a model against the environment.
Usage:
python inference.py
Environment Variables:
API_BASE_URL: The API endpoint (default: https://router.huggingface.co/v1)
MODEL_NAME: The model identifier (default: Qwen/Qwen2.5-72B-Instruct)
HF_TOKEN: Your Hugging Face / API key (required)
SERVER_URL: The environment server URL (default: http://localhost:7860)
Expected format for STDOUT:
[START] task=<task_name> env=<benchmark> model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn>
"""
import json
import os
import re
import textwrap
import requests
from typing import List, Optional
from dotenv import load_dotenv
load_dotenv()
from openai import OpenAI
# Force reload from .env file to override system env vars
import os
_env_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.env')
if os.path.exists(_env_file):
with open(_env_file) as f:
for line in f:
line = line.strip()
if line and '=' in line and not line.startswith('#'):
key, val = line.split('=', 1)
os.environ[key] = val
API_BASE_URL = os.environ.get("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.environ.get("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
HF_TOKEN = os.environ.get("HF_TOKEN")
SERVER_URL = os.environ.get("SERVER_URL", "http://localhost:7860")
MAX_STEPS = 8
MAX_TOKENS = 512
TEMPERATURE = 0.7
SUCCESS_SCORE_THRESHOLD = 0.5
BENCHMARK = "gdpr_auditor_env"
SYSTEM_PROMPT = textwrap.dedent(
"""
You are an expert GDPR Compliance Auditor. Your job is to review privacy policies
and data practices to identify compliance violations.
IMPORTANT: This is a multi-step auditing task. You must systematically review
each section of the policy and data practice to find ALL issues.
Task Types:
- Easy (Clause Existence): Check if mandatory GDPR clauses are present
- Medium (Purpose Mapping): Match data practices to stated purposes
- Hard (Dark Patterns): Find contradictions between policy statements and actual practices
Output Format:
Your finding should clearly identify:
- The issue type (missing clause, contradiction, etc.)
- The specific violation
- The severity (if mentioned)
Examples of findings:
- "Missing Right to be Forgotten clause"
- "Health data shared with advertisers - contradicts policy"
- "Section 1 says never shared but Section 3 lists third-party partners"
Output ONLY your finding, nothing else."""
).strip()
def log_start(task, env, model):
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step, action, reward, done, error):
done_str = str(done).lower()
error_str = "null" if error is None else str(error).replace('\n', ' ').replace('\r', '')
print(f"[STEP] step={step} action={action} reward={reward:.4f} done={done_str} error={error_str}", flush=True)
def log_end(success, steps, score, rewards):
success_str = str(success).lower()
rewards_str = ",".join(f"{r:.4f}" for r in rewards)
print(f"[END] success={success_str} steps={steps} score={score:.4f} rewards={rewards_str}", flush=True)
def reset_env(task: str) -> dict:
"""Reset the environment via HTTP."""
resp = requests.get(f"{SERVER_URL}/reset", params={"task": task})
resp.raise_for_status()
return resp.json()
def step_env(message: str) -> dict:
"""Send action to environment via HTTP."""
resp = requests.post(f"{SERVER_URL}/step", json={"message": message})
resp.raise_for_status()
return resp.json()
def build_user_prompt(obs_data: dict, step_num: int) -> str:
documents = obs_data.get("documents", [])
data_practices = obs_data.get("data_practices", [])
requirements = obs_data.get("compliance_requirements", [])
flagged = obs_data.get("flagged_issues", [])
policy_text = documents[0].get("content", "") if documents else "No policy found"
prompt = f"""Privacy Policy Audit (Step {step_num}):
--- POLICY CONTENT ---
{policy_text[:1500]}
--- DATA PRACTICES ---
"""
for dp in data_practices:
prompt += f"- {dp.get('category')}: {dp.get('purpose')} (shared: {dp.get('shared_with_third_parties')})\n"
prompt += f"""
--- COMPLIANCE REQUIREMENTS ---
"""
for req in requirements:
prompt += f"- {req}\n"
if flagged:
prompt += f"\n--- ALREADY FLAGGED ---\n"
for f in flagged:
prompt += f"- {f}\n"
prompt += f"""
Task: {obs_data.get('task_name')} ({obs_data.get('difficulty')})
Provide your next compliance finding:"""
return prompt
def call_model(client: OpenAI, user_prompt: str, history: List[dict]) -> str:
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
messages.extend(history)
messages.append({"role": "user", "content": user_prompt})
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=messages,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
stream=False,
)
text = (completion.choices[0].message.content or "").strip()
lines = text.split('\n')
for line in lines:
line = line.strip()
if line and len(line) > 10:
return line
return text if text else "No finding submitted"
except Exception as exc:
print(f"[DEBUG] Model request failed: {exc}", flush=True)
return "No finding submitted"
TASKS = {
"easy": {"task_id": "easy_clause_existence", "name": "Clause Existence Check", "difficulty": "easy"},
"medium": {"task_id": "medium_purpose_mapping", "name": "Purpose Mapping", "difficulty": "medium"},
"hard": {"task_id": "hard_dark_patterns", "name": "Dark Pattern Detection", "difficulty": "hard"},
"elite": {"task_id": "elite_multi_doc_reasoning", "name": "Multi-Document Reasoning", "difficulty": "elite"},
}
def run_task(client: OpenAI, task_key: str, verbose: bool = False) -> dict:
"""Run inference on a single task via HTTP."""
task = TASKS[task_key]
task_name = task["name"]
history: List[dict] = []
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
error_msg = None
log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME)
try:
result = reset_env(task_key)
obs_data = result.get("observation", {})
done = result.get("done", False)
for step in range(1, MAX_STEPS + 1):
if done:
break
user_prompt = build_user_prompt(obs_data, step)
response_text = call_model(client, user_prompt, history)
history.append({"role": "assistant", "content": response_text})
action_str = response_text[:60] + "..." if len(response_text) > 60 else response_text
action_log_str = action_str.replace('\n', ' ').replace('\r', '')
try:
result = step_env(response_text)
reward_data = result.get("reward", {})
if not reward_data:
print(f"[DEBUG] No reward in response: {result}", flush=True)
if isinstance(reward_data, dict):
reward = reward_data.get("value", 0.001) # default 0.001, never 0.0
else:
reward = float(reward_data)
done = result.get("done", False)
error_msg = None
obs_data = result.get("observation", {})
except Exception as exc:
error_msg = str(exc).replace('\n', ' ').replace('\r', '')
reward = 0.001 # 1e-6 formats as "0.0000" β€” use 0.001 so :.4f gives "0.0010"
done = True
obs_data = {}
rewards.append(reward)
steps_taken = step
log_step(step=step, action=action_log_str, reward=reward, done=done, error=error_msg)
if done:
break
# 1e-6 formats as "0.0000" with :.4f β€” validator parses that as 0.0 and fails
# Use 0.001 min so it formats as "0.0010", and 0.999 max so it formats as "0.9990"
_MIN_SCORE = 0.001
_MAX_SCORE = 0.999
score = max(rewards) if rewards else _MIN_SCORE
score = max(_MIN_SCORE, min(_MAX_SCORE, score))
success = score >= SUCCESS_SCORE_THRESHOLD
except Exception as exc:
error_msg = str(exc).replace('\n', ' ').replace('\r', '')
print(f"[DEBUG] Task execution error: {error_msg}", flush=True)
finally:
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
return {
"task_id": task["task_id"],
"task_name": task_name,
"score": score,
"success": success,
"steps": steps_taken,
"rewards": rewards,
}
def main():
print("=" * 60)
print("GDPR Auditor - Baseline Inference")
print("=" * 60)
print(f"API URL : {API_BASE_URL}")
print(f"Model : {MODEL_NAME}")
print(f"Server : {SERVER_URL}")
print()
if not HF_TOKEN:
print("ERROR: HF_TOKEN not set")
return
if not API_BASE_URL:
print("ERROR: API_BASE_URL not set")
return
if not MODEL_NAME:
print("ERROR: MODEL_NAME not set")
return
try:
client = OpenAI(
base_url=API_BASE_URL,
api_key=HF_TOKEN,
)
except Exception as e:
print(f"ERROR: Failed to create client: {e}")
return
try:
resp = requests.get(f"{SERVER_URL}/health", timeout=5)
if resp.status_code != 200:
print(f"ERROR: Server returned {resp.status_code}")
return
print("Server connection: OK")
except Exception as e:
print(f"ERROR: Cannot connect to server: {e}")
return
results = []
for task_key in ["easy", "medium", "hard", "elite"]:
task_name = TASKS[task_key]["name"]
print(f"\nRunning task: {task_name} ({task_key})...")
result = run_task(client, task_key)
results.append(result)
print(f" score={result['score']:.4f} steps={result['steps']}")
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
for r in results:
print(f"[{r['task_id'][:1].upper()}] {r['task_id']:<30} score={r['score']:.4f}")
print("-" * 60)
avg = sum(r["score"] for r in results) / len(results)
print(f"Average score: {avg:.4f}")
output = {
"model": MODEL_NAME,
"api_url": API_BASE_URL,
"timestamp": str(__import__("datetime").datetime.now()),
"average_score": avg,
"results": results,
}
with open("inference_results.json", "w") as f:
json.dump(output, f, indent=2)
print(f"\nResults saved to: inference_results.json")
if __name__ == "__main__":
main()