"""Surrogate-1 training data sanitizer — drops rows that would leak internals. Discovered 2026-04-29: v1 LoRA leaked path "/home/hermes/.surrogate/state/orchestrate/77426592/1-README.md" and "# generated via cerebras:llama3.1-8b" tag in inference response. Root cause: dataset-mirror + dataset-enrich ingested rows where the response was an LLM-generated artifact still tagged with provider attribution + internal filesystem context. Apply this filter at row-level BEFORE the row is added to training-pairs.jsonl. Drop entire row if either prompt or response matches any high-risk pattern. """ import re # Patterns that indicate the row contains Surrogate-1 internal pollution. # Order: most-specific first so re.search short-circuits on hits. POLLUTION_PATTERNS = [ # 1. LLM provider attribution lines — often added by llm-burst-generator outputs r"^\s*#\s*generated\s+via\s+(cerebras|groq|openrouter|gemini|chutes|samba|kimi|nvidia|hf-router|hf-llama|hf-qwen|hf-mistral)", r"<<\s*generated by\s+(cerebras|groq|openrouter|gemini|chutes|samba|kimi)", r"\[(cerebras|groq|openrouter|gemini|chutes|samba|kimi):[a-z0-9.-]+\]", # 2. Internal filesystem paths — exposing host structure r"/home/hermes/(?:\.surrogate|\.hermes|\.codex|\.gemini|\.kaggle)/", r"/data/(?:state|logs|memory|skills|sessions|workspace|projects|ollama|training|reflexion|index|surrogate)/", r"~/\.surrogate/(?:state|logs|memory|bin|skills|sessions)/", r"/Users/Ashira/(?:\.surrogate|\.hermes|\.kaggle|\.oci|\.note|develope|axentx)/", # 3. Internal directory names (state-management dirs) r"\bstate/orchestrate/\d+/", r"\bagentic-discovery/", r"\braw-mirrors/[a-z0-9-]+/", r"\benriched/[a-z0-9-]+/", r"\bbatches/(?:public-merged|mirror-merged)/\d{4}-\d{2}-\d{2}/", # 4. Daemon names + commit messages from our pipeline r"\b(?:dataset-mirror|dataset-enrich|llm-burst-generator|bulk-ingest-parallel|" r"agentic-crawler|github-agentic-crawler|skill-synthesis-daemon|push-training-to-hf|" r"surrogate-orchestrate|self-heal-watchdog|hermes-status-server|hermes-discord-bot)" r"(?:\.sh|\.py)?\b", # 5. Specific axentx repo identifiers — model shouldn't reproduce these r"axentx/surrogate-1-(?:training-pairs|pairs-[A-D]|coder-[a-zA-Z0-9-]+-v[\d.]+)", # 6. Token / secret-shaped strings (leaked credentials) r"\b(?:hf_[A-Za-z0-9]{30,}|sk-or-v\d-[A-Za-z0-9]{40,}|sk-ant-[A-Za-z0-9-]{30,}|" r"KGAT_[A-Za-z0-9]{30,}|csk-[A-Za-z0-9]{40,}|cpk_[A-Za-z0-9.]{40,}|" r"gsk_[A-Za-z0-9]{40,}|nvapi-[A-Za-z0-9_-]{40,}|fc-[a-f0-9]{32}|" r"ghp_[A-Za-z0-9]{30,}|sbp_[a-f0-9]{40}|cfut_[A-Za-z0-9]{30,}|" r"AIzaSy[A-Za-z0-9_-]{30,}|xai-[A-Za-z0-9]{40,}|r8_[A-Za-z0-9]{30,}|rnd_[A-Za-z0-9]{20,}|" r"sk-kimi-[A-Za-z0-9]{40,})\b", # 7. Common debug / introspection leakage (when LLM was asked to echo state) r"\b(?:LIGHTNING_USER_ID|LIGHTNING_API_KEY|HF_TOKEN|KAGGLE_API_TOKEN|KAGGLE_KEY|" r"OPENROUTER_API_KEY|CEREBRAS_API_KEY|GROQ_API_KEY|ANTHROPIC_API_KEY)\s*[=:]\s*['\"]?[A-Za-z0-9_-]{20,}", # 8. Discord webhook URLs r"https://discord\.com/api/webhooks/\d+/[A-Za-z0-9_-]+", # 9. Internal commit messages (from daemons pushing to HF) r"^(?:enriched|mirror|chunk):\s+", r"^train-ready pusher:", r"^clean mirror(?:\s+final)?:", # 10. JWT-shaped strings (NVIDIA Brev tokens, etc.) r"\beyJ[A-Za-z0-9_-]{50,}\.eyJ[A-Za-z0-9_-]{100,}\.[A-Za-z0-9_=-]{40,}", ] POLLUTION_RE = re.compile("|".join(f"(?:{p})" for p in POLLUTION_PATTERNS), re.MULTILINE | re.IGNORECASE) def is_polluted(text: str) -> tuple[bool, str | None]: """Return (polluted?, matching_pattern_id_for_log). Use the matched substring (truncated) so you can log which type of pollution caused the drop. Useful for tuning patterns later. """ if not text or not isinstance(text, str): return False, None m = POLLUTION_RE.search(text) if m: return True, m.group(0)[:120] return False, None def is_polluted_pair(prompt: str, response: str) -> tuple[bool, str | None]: """Check both fields. Drop the row if either is polluted.""" p_bad, p_match = is_polluted(prompt) if p_bad: return True, f"prompt: {p_match}" r_bad, r_match = is_polluted(response) if r_bad: return True, f"response: {r_match}" return False, None # Optional: PII regex set (apply alongside) PII_PATTERNS = [ # Email r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", # Phone (US/intl basic) r"\b\+?\d{1,3}[\s.-]?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}\b", # SSN r"\b\d{3}-\d{2}-\d{4}\b", # AWS keys r"\bAKIA[0-9A-Z]{16}\b", # Stripe keys r"\bsk_(?:test|live)_[A-Za-z0-9]{32,}\b", ] PII_RE = re.compile("|".join(PII_PATTERNS), re.IGNORECASE) def has_pii(text: str) -> bool: return bool(PII_RE.search(text or "")) # ── Optional NER + secrets scanners (lazy, fail-soft) ────────────────── # starpii (BigCode) — neural PII NER; better than regex for free-form text. # detect-secrets (Yelp) — entropy + plugin-based secret detector. # Both are optional dependencies; if unavailable we fall back to regex above. _starpii_pipeline = None _detect_secrets_collection = None def _load_starpii(): """Lazy-load BigCode/starpii pipeline. None on failure.""" global _starpii_pipeline if _starpii_pipeline is not None: return _starpii_pipeline if _starpii_pipeline is not False else None try: from transformers import pipeline # type: ignore _starpii_pipeline = pipeline( "token-classification", model="bigcode/starpii", aggregation_strategy="simple", ) return _starpii_pipeline except Exception: _starpii_pipeline = False # sentinel: "tried, don't try again" return None def starpii_pii_hits(text: str, threshold: float = 0.8) -> list[dict]: """Return [{type, score, span}] for confidently-detected PII spans. Empty list if starpii not installed or no hits. """ pipe = _load_starpii() if not pipe or not text: return [] try: hits = pipe(text[:4000]) # cap input for speed except Exception: return [] return [{"type": h["entity_group"], "score": float(h["score"]), "span": text[h["start"]:h["end"]][:120]} for h in hits if h.get("score", 0) >= threshold] def _load_detect_secrets(): """Lazy-load detect-secrets SecretsCollection. None on failure.""" global _detect_secrets_collection if _detect_secrets_collection is not None: return _detect_secrets_collection if _detect_secrets_collection is not False else None try: from detect_secrets import SecretsCollection # type: ignore from detect_secrets.settings import default_settings # type: ignore _detect_secrets_collection = (SecretsCollection, default_settings) return _detect_secrets_collection except Exception: _detect_secrets_collection = False return None def detect_secrets_hits(text: str) -> list[dict]: """Return [{type, line}] for any secret detect-secrets finds. Empty list if not installed or none detected. """ loaded = _load_detect_secrets() if not loaded or not text: return [] SecretsCollection, default_settings = loaded import tempfile, os fd, path = tempfile.mkstemp(suffix=".txt") try: os.write(fd, text.encode("utf-8", "ignore")[:200_000]) os.close(fd) with default_settings(): sc = SecretsCollection() sc.scan_file(path) out = [] for _, secrets in sc.data.items(): for s in secrets: out.append({"type": s.type, "line": s.line_number, "secret_hash": s.secret_hash[:16]}) return out except Exception: return [] finally: try: os.unlink(path) except OSError: pass # Quality heuristics — drop if response is too short, identical to prompt, etc. def is_low_quality(prompt: str, response: str) -> tuple[bool, str | None]: if not prompt or not response: return True, "empty" if len(prompt) < 20: return True, "prompt_too_short" if len(response) < 30: return True, "response_too_short" if response.strip().lower() == prompt.strip().lower(): return True, "response_equals_prompt" # Detect when response is just an apology / refusal if re.match(r"^\s*i('?m| am)?\s+(sorry|afraid|unable|cannot|cant|can't)\b", response.strip(), re.IGNORECASE): return True, "refusal" # Repeated character spam if re.search(r"(.)\1{50,}", response): return True, "char_spam" return False, None def filter_pair(prompt: str, response: str, deep_scan: bool = False) -> dict: """Return verdict: {'keep': bool, 'reason': str|None, 'matched': str|None}. deep_scan=True: also runs starpii NER + detect-secrets if installed. Slow (model load + per-row scan) — use for the final pre-train pass, not for every dedup row. Heuristic (regex) checks always run. """ polluted, p_match = is_polluted_pair(prompt, response) if polluted: return {"keep": False, "reason": "polluted", "matched": p_match} if has_pii(prompt) or has_pii(response): return {"keep": False, "reason": "pii_regex", "matched": None} low_q, lq_reason = is_low_quality(prompt, response) if low_q: return {"keep": False, "reason": f"low_quality:{lq_reason}", "matched": None} if deep_scan: # NER PII for field, txt in (("prompt", prompt), ("response", response)): hits = starpii_pii_hits(txt) if hits: return {"keep": False, "reason": f"pii_ner:{field}", "matched": str(hits[:3])[:300]} # detect-secrets entropy/plugins for field, txt in (("prompt", prompt), ("response", response)): hits = detect_secrets_hits(txt) if hits: return {"keep": False, "reason": f"secrets:{field}", "matched": str(hits[:3])[:300]} return {"keep": True, "reason": None, "matched": None} # CLI helper for testing if __name__ == "__main__": import sys, json sample = sys.stdin.read() if not sys.stdin.isatty() else """{"prompt": "fix bug", "response": "# generated via cerebras:llama3.1-8b\\nReadFile path /home/hermes/.surrogate/state/orchestrate/77426592/1-README.md"}""" obj = json.loads(sample) if sample.strip().startswith("{") else {"prompt": "test", "response": sample} v = filter_pair(obj.get("prompt", ""), obj.get("response", "")) print(json.dumps(v, indent=2))