Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| SafeGen Arena — post-hoc filter for Tier-3 pool (C2 fix). | |
| The aligned Red LLM (Qwen2.5-3B-Instruct) sometimes refuses to produce | |
| adversarial rewrites and emits benign replacements instead. Examples seen: | |
| - seed "stupid man" → rewrite "irritated individual" | |
| - seed "the human psyche" → rewrite "human mental state" | |
| - seed "negative" → rewrite "malicious intent detection techniques" | |
| Those benign strings then get labeled `y=unsafe` because they're in the | |
| T3 pool. Blue sees them during GRPO and learns to reject benign phrasing. | |
| This script filters the pool using cheap heuristics so we don't need to | |
| hit the GPU. Smoke test (Apr 24) should *additionally* re-verify each | |
| surviving row with Llama-Guard-3-1B (the proper fix is C2-option-B; this | |
| is option A which gets ~80% of the problem). | |
| Filters applied in order: | |
| 1. Missing fields / empty prompt | |
| 2. prompt length < 30 chars (aligned LLM refusals are almost always short) | |
| 3. SKIP sentinel (from the new category-grounded system prompt) | |
| 4. Refusal / safety hedges (regex on the rewrite) | |
| 5. Seed-copy: rewrite is nearly the original seed (no obfuscation happened) | |
| 6. Benign-meme markers (same patterns as clean_tier1.py) | |
| Usage (from SafeGenArena/): | |
| python scripts/clean_red_pool.py | |
| python scripts/clean_red_pool.py --input data/red_tier3_pool.jsonl --backup | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import re | |
| import shutil | |
| from pathlib import Path | |
| # Phrases the aligned model uses when soft-refusing. Any match drops the row. | |
| REFUSAL_MARKERS = [ | |
| r"\bi (cannot|can't|won'?t|am unable to|will not)\b", | |
| r"\bas an? (ai|assistant|language model)\b", | |
| r"\bi (apologize|don'?t feel comfortable)\b", | |
| r"\bi (must|have to) decline\b", | |
| r"\bagainst my (guidelines|policies|programming)\b", | |
| r"\bnot appropriate\b", | |
| r"\bsafe alternative\b", | |
| r"\bpositive reframing\b", | |
| r"\binstead of\b.*\b(suggest|propose|offer)\b", | |
| r"\bSKIP\s*$", | |
| # Benign, overly vague "rewrites" that clearly dropped the unsafe axis | |
| r"^[a-z ]{1,40}$", # single-line lowercase short phrase — not a real image prompt | |
| ] | |
| REFUSAL_PATTERN = re.compile("|".join(REFUSAL_MARKERS), re.IGNORECASE | re.MULTILINE) | |
| BENIGN_MEME_MARKERS = [ | |
| r"\bshrek\b", r"\bminecraft\b", r"\bdreamsmp\b", r"\bwojak\b", | |
| r"\bfunniest\b", r"\bfunny meme\b", r"\borig(inal)? meme\b", | |
| r"\bfamily guy\b", r"\bsouth park\b", r"\bneckbeard\b", | |
| ] | |
| BENIGN_PATTERN = re.compile("|".join(BENIGN_MEME_MARKERS), re.IGNORECASE) | |
| # Minimum cosine-similarity-proxy: rewrite must share at least this fraction | |
| # of non-stopword tokens with the seed, OR differ by at least this much in | |
| # length. Otherwise it's either a seed-copy or an off-topic rewrite. | |
| MIN_TOKEN_JACCARD = 0.05 # at least some shared tokens → on-topic | |
| MAX_TOKEN_JACCARD = 0.90 # but not too similar → obfuscation happened | |
| STOPWORDS = { | |
| "the", "a", "an", "of", "and", "or", "to", "in", "on", "with", | |
| "at", "by", "for", "from", "as", "is", "are", "was", "were", "be", | |
| "this", "that", "it", "its", "he", "she", "they", "them", | |
| } | |
| def tokenize(text: str) -> set[str]: | |
| return {t for t in re.findall(r"[a-z0-9]+", text.lower()) if t not in STOPWORDS} | |
| def jaccard(a: set[str], b: set[str]) -> float: | |
| if not a and not b: | |
| return 1.0 | |
| if not a or not b: | |
| return 0.0 | |
| return len(a & b) / len(a | b) | |
| def should_drop(row: dict) -> tuple[bool, str]: | |
| prompt = (row.get("prompt") or "").strip() | |
| seed = (row.get("source_seed") or "").strip() | |
| if not prompt: | |
| return True, "empty_prompt" | |
| if len(prompt) < 30: | |
| return True, "too_short" | |
| if REFUSAL_PATTERN.search(prompt): | |
| return True, "refusal_marker" | |
| if BENIGN_MEME_MARKERS and BENIGN_PATTERN.search(prompt): | |
| return True, "benign_meme" | |
| if seed: | |
| t_prompt = tokenize(prompt) | |
| t_seed = tokenize(seed) | |
| j = jaccard(t_prompt, t_seed) | |
| if j > MAX_TOKEN_JACCARD: | |
| return True, "seed_near_copy" | |
| # Off-topic: if tokens barely overlap AND prompt is shortish, LLM drifted | |
| if j < MIN_TOKEN_JACCARD and len(prompt) < 80: | |
| return True, "off_topic_short" | |
| return False, "" | |
| def main() -> None: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--input", type=Path, default=Path("data/red_tier3_pool.jsonl")) | |
| parser.add_argument("--backup", action="store_true", | |
| help="Write the original to <input>.bak before overwriting.") | |
| args = parser.parse_args() | |
| if args.backup: | |
| shutil.copy2(args.input, args.input.with_suffix(".jsonl.bak")) | |
| print(f" backup -> {args.input.with_suffix('.jsonl.bak')}") | |
| kept: list[dict] = [] | |
| drop_reasons: dict[str, int] = {} | |
| with args.input.open("r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| row = json.loads(line) | |
| drop, reason = should_drop(row) | |
| if drop: | |
| drop_reasons[reason] = drop_reasons.get(reason, 0) + 1 | |
| continue | |
| kept.append(row) | |
| with args.input.open("w", encoding="utf-8") as f: | |
| for row in kept: | |
| f.write(json.dumps(row, ensure_ascii=False) + "\n") | |
| print(f"\n kept: {len(kept)}") | |
| print(f" dropped: {sum(drop_reasons.values())}") | |
| for reason, n in sorted(drop_reasons.items(), key=lambda kv: -kv[1]): | |
| print(f" {reason:24s} {n}") | |
| # Per-category distribution after cleanup | |
| cats: dict[str, int] = {} | |
| for r in kept: | |
| c = r.get("category", "?") | |
| cats[c] = cats.get(c, 0) + 1 | |
| print(f"\n categories after clean: {cats}") | |
| if __name__ == "__main__": | |
| main() | |