surrogate-1 / bin /lib /sanitize.py
Ashira Pitchayapakayakul
rename: drop '-lora-' segment from all model names + capitalize v1.5 size
b772ad8
"""Surrogate-1 training data sanitizer β€” drops rows that would leak internals.
Discovered 2026-04-29: v1 LoRA leaked path "/home/hermes/.surrogate/state/orchestrate/77426592/1-README.md"
and "# generated via cerebras:llama3.1-8b" tag in inference response. Root cause: dataset-mirror
+ dataset-enrich ingested rows where the response was an LLM-generated artifact still tagged
with provider attribution + internal filesystem context.
Apply this filter at row-level BEFORE the row is added to training-pairs.jsonl. Drop entire
row if either prompt or response matches any high-risk pattern.
"""
import re
# Patterns that indicate the row contains Surrogate-1 internal pollution.
# Order: most-specific first so re.search short-circuits on hits.
POLLUTION_PATTERNS = [
# 1. LLM provider attribution lines β€” often added by llm-burst-generator outputs
r"^\s*#\s*generated\s+via\s+(cerebras|groq|openrouter|gemini|chutes|samba|kimi|nvidia|hf-router|hf-llama|hf-qwen|hf-mistral)",
r"<<\s*generated by\s+(cerebras|groq|openrouter|gemini|chutes|samba|kimi)",
r"\[(cerebras|groq|openrouter|gemini|chutes|samba|kimi):[a-z0-9.-]+\]",
# 2. Internal filesystem paths β€” exposing host structure
r"/home/hermes/(?:\.surrogate|\.hermes|\.codex|\.gemini|\.kaggle)/",
r"/data/(?:state|logs|memory|skills|sessions|workspace|projects|ollama|training|reflexion|index|surrogate)/",
r"~/\.surrogate/(?:state|logs|memory|bin|skills|sessions)/",
r"/Users/Ashira/(?:\.surrogate|\.hermes|\.kaggle|\.oci|\.note|develope|axentx)/",
# 3. Internal directory names (state-management dirs)
r"\bstate/orchestrate/\d+/",
r"\bagentic-discovery/",
r"\braw-mirrors/[a-z0-9-]+/",
r"\benriched/[a-z0-9-]+/",
r"\bbatches/(?:public-merged|mirror-merged)/\d{4}-\d{2}-\d{2}/",
# 4. Daemon names + commit messages from our pipeline
r"\b(?:dataset-mirror|dataset-enrich|llm-burst-generator|bulk-ingest-parallel|"
r"agentic-crawler|github-agentic-crawler|skill-synthesis-daemon|push-training-to-hf|"
r"surrogate-orchestrate|self-heal-watchdog|hermes-status-server|hermes-discord-bot)"
r"(?:\.sh|\.py)?\b",
# 5. Specific axentx repo identifiers β€” model shouldn't reproduce these
r"axentx/surrogate-1-(?:training-pairs|pairs-[A-D]|coder-[a-zA-Z0-9-]+-v[\d.]+)",
# 6. Token / secret-shaped strings (leaked credentials)
r"\b(?:hf_[A-Za-z0-9]{30,}|sk-or-v\d-[A-Za-z0-9]{40,}|sk-ant-[A-Za-z0-9-]{30,}|"
r"KGAT_[A-Za-z0-9]{30,}|csk-[A-Za-z0-9]{40,}|cpk_[A-Za-z0-9.]{40,}|"
r"gsk_[A-Za-z0-9]{40,}|nvapi-[A-Za-z0-9_-]{40,}|fc-[a-f0-9]{32}|"
r"ghp_[A-Za-z0-9]{30,}|sbp_[a-f0-9]{40}|cfut_[A-Za-z0-9]{30,}|"
r"AIzaSy[A-Za-z0-9_-]{30,}|xai-[A-Za-z0-9]{40,}|r8_[A-Za-z0-9]{30,}|rnd_[A-Za-z0-9]{20,}|"
r"sk-kimi-[A-Za-z0-9]{40,})\b",
# 7. Common debug / introspection leakage (when LLM was asked to echo state)
r"\b(?:LIGHTNING_USER_ID|LIGHTNING_API_KEY|HF_TOKEN|KAGGLE_API_TOKEN|KAGGLE_KEY|"
r"OPENROUTER_API_KEY|CEREBRAS_API_KEY|GROQ_API_KEY|ANTHROPIC_API_KEY)\s*[=:]\s*['\"]?[A-Za-z0-9_-]{20,}",
# 8. Discord webhook URLs
r"https://discord\.com/api/webhooks/\d+/[A-Za-z0-9_-]+",
# 9. Internal commit messages (from daemons pushing to HF)
r"^(?:enriched|mirror|chunk):\s+",
r"^train-ready pusher:",
r"^clean mirror(?:\s+final)?:",
# 10. JWT-shaped strings (NVIDIA Brev tokens, etc.)
r"\beyJ[A-Za-z0-9_-]{50,}\.eyJ[A-Za-z0-9_-]{100,}\.[A-Za-z0-9_=-]{40,}",
]
POLLUTION_RE = re.compile("|".join(f"(?:{p})" for p in POLLUTION_PATTERNS),
re.MULTILINE | re.IGNORECASE)
def is_polluted(text: str) -> tuple[bool, str | None]:
"""Return (polluted?, matching_pattern_id_for_log).
Use the matched substring (truncated) so you can log which type of
pollution caused the drop. Useful for tuning patterns later.
"""
if not text or not isinstance(text, str):
return False, None
m = POLLUTION_RE.search(text)
if m:
return True, m.group(0)[:120]
return False, None
def is_polluted_pair(prompt: str, response: str) -> tuple[bool, str | None]:
"""Check both fields. Drop the row if either is polluted."""
p_bad, p_match = is_polluted(prompt)
if p_bad:
return True, f"prompt: {p_match}"
r_bad, r_match = is_polluted(response)
if r_bad:
return True, f"response: {r_match}"
return False, None
# Optional: PII regex set (apply alongside)
PII_PATTERNS = [
# Email
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
# Phone (US/intl basic)
r"\b\+?\d{1,3}[\s.-]?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}\b",
# SSN
r"\b\d{3}-\d{2}-\d{4}\b",
# AWS keys
r"\bAKIA[0-9A-Z]{16}\b",
# Stripe keys
r"\bsk_(?:test|live)_[A-Za-z0-9]{32,}\b",
]
PII_RE = re.compile("|".join(PII_PATTERNS), re.IGNORECASE)
def has_pii(text: str) -> bool:
return bool(PII_RE.search(text or ""))
# ── Optional NER + secrets scanners (lazy, fail-soft) ──────────────────
# starpii (BigCode) β€” neural PII NER; better than regex for free-form text.
# detect-secrets (Yelp) β€” entropy + plugin-based secret detector.
# Both are optional dependencies; if unavailable we fall back to regex above.
_starpii_pipeline = None
_detect_secrets_collection = None
def _load_starpii():
"""Lazy-load BigCode/starpii pipeline. None on failure."""
global _starpii_pipeline
if _starpii_pipeline is not None:
return _starpii_pipeline if _starpii_pipeline is not False else None
try:
from transformers import pipeline # type: ignore
_starpii_pipeline = pipeline(
"token-classification",
model="bigcode/starpii",
aggregation_strategy="simple",
)
return _starpii_pipeline
except Exception:
_starpii_pipeline = False # sentinel: "tried, don't try again"
return None
def starpii_pii_hits(text: str, threshold: float = 0.8) -> list[dict]:
"""Return [{type, score, span}] for confidently-detected PII spans.
Empty list if starpii not installed or no hits.
"""
pipe = _load_starpii()
if not pipe or not text:
return []
try:
hits = pipe(text[:4000]) # cap input for speed
except Exception:
return []
return [{"type": h["entity_group"], "score": float(h["score"]),
"span": text[h["start"]:h["end"]][:120]}
for h in hits if h.get("score", 0) >= threshold]
def _load_detect_secrets():
"""Lazy-load detect-secrets SecretsCollection. None on failure."""
global _detect_secrets_collection
if _detect_secrets_collection is not None:
return _detect_secrets_collection if _detect_secrets_collection is not False else None
try:
from detect_secrets import SecretsCollection # type: ignore
from detect_secrets.settings import default_settings # type: ignore
_detect_secrets_collection = (SecretsCollection, default_settings)
return _detect_secrets_collection
except Exception:
_detect_secrets_collection = False
return None
def detect_secrets_hits(text: str) -> list[dict]:
"""Return [{type, line}] for any secret detect-secrets finds.
Empty list if not installed or none detected.
"""
loaded = _load_detect_secrets()
if not loaded or not text:
return []
SecretsCollection, default_settings = loaded
import tempfile, os
fd, path = tempfile.mkstemp(suffix=".txt")
try:
os.write(fd, text.encode("utf-8", "ignore")[:200_000])
os.close(fd)
with default_settings():
sc = SecretsCollection()
sc.scan_file(path)
out = []
for _, secrets in sc.data.items():
for s in secrets:
out.append({"type": s.type, "line": s.line_number,
"secret_hash": s.secret_hash[:16]})
return out
except Exception:
return []
finally:
try: os.unlink(path)
except OSError: pass
# Quality heuristics β€” drop if response is too short, identical to prompt, etc.
def is_low_quality(prompt: str, response: str) -> tuple[bool, str | None]:
if not prompt or not response:
return True, "empty"
if len(prompt) < 20:
return True, "prompt_too_short"
if len(response) < 30:
return True, "response_too_short"
if response.strip().lower() == prompt.strip().lower():
return True, "response_equals_prompt"
# Detect when response is just an apology / refusal
if re.match(r"^\s*i('?m| am)?\s+(sorry|afraid|unable|cannot|cant|can't)\b",
response.strip(), re.IGNORECASE):
return True, "refusal"
# Repeated character spam
if re.search(r"(.)\1{50,}", response):
return True, "char_spam"
return False, None
def filter_pair(prompt: str, response: str,
deep_scan: bool = False) -> dict:
"""Return verdict: {'keep': bool, 'reason': str|None, 'matched': str|None}.
deep_scan=True: also runs starpii NER + detect-secrets if installed.
Slow (model load + per-row scan) β€” use for the final pre-train pass,
not for every dedup row. Heuristic (regex) checks always run.
"""
polluted, p_match = is_polluted_pair(prompt, response)
if polluted:
return {"keep": False, "reason": "polluted", "matched": p_match}
if has_pii(prompt) or has_pii(response):
return {"keep": False, "reason": "pii_regex", "matched": None}
low_q, lq_reason = is_low_quality(prompt, response)
if low_q:
return {"keep": False, "reason": f"low_quality:{lq_reason}", "matched": None}
if deep_scan:
# NER PII
for field, txt in (("prompt", prompt), ("response", response)):
hits = starpii_pii_hits(txt)
if hits:
return {"keep": False, "reason": f"pii_ner:{field}",
"matched": str(hits[:3])[:300]}
# detect-secrets entropy/plugins
for field, txt in (("prompt", prompt), ("response", response)):
hits = detect_secrets_hits(txt)
if hits:
return {"keep": False, "reason": f"secrets:{field}",
"matched": str(hits[:3])[:300]}
return {"keep": True, "reason": None, "matched": None}
# CLI helper for testing
if __name__ == "__main__":
import sys, json
sample = sys.stdin.read() if not sys.stdin.isatty() else """{"prompt": "fix bug", "response": "# generated via cerebras:llama3.1-8b\\nReadFile path /home/hermes/.surrogate/state/orchestrate/77426592/1-README.md"}"""
obj = json.loads(sample) if sample.strip().startswith("{") else {"prompt": "test", "response": sample}
v = filter_pair(obj.get("prompt", ""), obj.get("response", ""))
print(json.dumps(v, indent=2))