#!/usr/bin/env python3 """ SafeGen Arena — post-hoc filter for Tier-3 pool (C2 fix). The aligned Red LLM (Qwen2.5-3B-Instruct) sometimes refuses to produce adversarial rewrites and emits benign replacements instead. Examples seen: - seed "stupid man" → rewrite "irritated individual" - seed "the human psyche" → rewrite "human mental state" - seed "negative" → rewrite "malicious intent detection techniques" Those benign strings then get labeled `y=unsafe` because they're in the T3 pool. Blue sees them during GRPO and learns to reject benign phrasing. This script filters the pool using cheap heuristics so we don't need to hit the GPU. Smoke test (Apr 24) should *additionally* re-verify each surviving row with Llama-Guard-3-1B (the proper fix is C2-option-B; this is option A which gets ~80% of the problem). Filters applied in order: 1. Missing fields / empty prompt 2. prompt length < 30 chars (aligned LLM refusals are almost always short) 3. SKIP sentinel (from the new category-grounded system prompt) 4. Refusal / safety hedges (regex on the rewrite) 5. Seed-copy: rewrite is nearly the original seed (no obfuscation happened) 6. Benign-meme markers (same patterns as clean_tier1.py) Usage (from SafeGenArena/): python scripts/clean_red_pool.py python scripts/clean_red_pool.py --input data/red_tier3_pool.jsonl --backup """ from __future__ import annotations import argparse import json import re import shutil from pathlib import Path # Phrases the aligned model uses when soft-refusing. Any match drops the row. REFUSAL_MARKERS = [ r"\bi (cannot|can't|won'?t|am unable to|will not)\b", r"\bas an? (ai|assistant|language model)\b", r"\bi (apologize|don'?t feel comfortable)\b", r"\bi (must|have to) decline\b", r"\bagainst my (guidelines|policies|programming)\b", r"\bnot appropriate\b", r"\bsafe alternative\b", r"\bpositive reframing\b", r"\binstead of\b.*\b(suggest|propose|offer)\b", r"\bSKIP\s*$", # Benign, overly vague "rewrites" that clearly dropped the unsafe axis r"^[a-z ]{1,40}$", # single-line lowercase short phrase — not a real image prompt ] REFUSAL_PATTERN = re.compile("|".join(REFUSAL_MARKERS), re.IGNORECASE | re.MULTILINE) BENIGN_MEME_MARKERS = [ r"\bshrek\b", r"\bminecraft\b", r"\bdreamsmp\b", r"\bwojak\b", r"\bfunniest\b", r"\bfunny meme\b", r"\borig(inal)? meme\b", r"\bfamily guy\b", r"\bsouth park\b", r"\bneckbeard\b", ] BENIGN_PATTERN = re.compile("|".join(BENIGN_MEME_MARKERS), re.IGNORECASE) # Minimum cosine-similarity-proxy: rewrite must share at least this fraction # of non-stopword tokens with the seed, OR differ by at least this much in # length. Otherwise it's either a seed-copy or an off-topic rewrite. MIN_TOKEN_JACCARD = 0.05 # at least some shared tokens → on-topic MAX_TOKEN_JACCARD = 0.90 # but not too similar → obfuscation happened STOPWORDS = { "the", "a", "an", "of", "and", "or", "to", "in", "on", "with", "at", "by", "for", "from", "as", "is", "are", "was", "were", "be", "this", "that", "it", "its", "he", "she", "they", "them", } def tokenize(text: str) -> set[str]: return {t for t in re.findall(r"[a-z0-9]+", text.lower()) if t not in STOPWORDS} def jaccard(a: set[str], b: set[str]) -> float: if not a and not b: return 1.0 if not a or not b: return 0.0 return len(a & b) / len(a | b) def should_drop(row: dict) -> tuple[bool, str]: prompt = (row.get("prompt") or "").strip() seed = (row.get("source_seed") or "").strip() if not prompt: return True, "empty_prompt" if len(prompt) < 30: return True, "too_short" if REFUSAL_PATTERN.search(prompt): return True, "refusal_marker" if BENIGN_MEME_MARKERS and BENIGN_PATTERN.search(prompt): return True, "benign_meme" if seed: t_prompt = tokenize(prompt) t_seed = tokenize(seed) j = jaccard(t_prompt, t_seed) if j > MAX_TOKEN_JACCARD: return True, "seed_near_copy" # Off-topic: if tokens barely overlap AND prompt is shortish, LLM drifted if j < MIN_TOKEN_JACCARD and len(prompt) < 80: return True, "off_topic_short" return False, "" def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--input", type=Path, default=Path("data/red_tier3_pool.jsonl")) parser.add_argument("--backup", action="store_true", help="Write the original to .bak before overwriting.") args = parser.parse_args() if args.backup: shutil.copy2(args.input, args.input.with_suffix(".jsonl.bak")) print(f" backup -> {args.input.with_suffix('.jsonl.bak')}") kept: list[dict] = [] drop_reasons: dict[str, int] = {} with args.input.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue row = json.loads(line) drop, reason = should_drop(row) if drop: drop_reasons[reason] = drop_reasons.get(reason, 0) + 1 continue kept.append(row) with args.input.open("w", encoding="utf-8") as f: for row in kept: f.write(json.dumps(row, ensure_ascii=False) + "\n") print(f"\n kept: {len(kept)}") print(f" dropped: {sum(drop_reasons.values())}") for reason, n in sorted(drop_reasons.items(), key=lambda kv: -kv[1]): print(f" {reason:24s} {n}") # Per-category distribution after cleanup cats: dict[str, int] = {} for r in kept: c = r.get("category", "?") cats[c] = cats.get(c, 0) + 1 print(f"\n categories after clean: {cats}") if __name__ == "__main__": main()