surrogate-1 / bin /v2 /constitutional-loop.py
Ashira Pitchayapakayakul
fix: disable chutes ladder + zero-gpu cold-start retry
7fd3e2c
"""Surrogate-1 v2 β€” Constitutional self-critique β†’ DPO data generator.
Implements Bai et al. 2022 (Constitutional AI) but specialized for
DevSecOps/SRE/code agents. For each input prompt:
1. Surrogate generates a response.
2. Self-critique against project-specific principles.
3. Revise if any principle flagged.
4. Output (original = rejected, revised = chosen) β†’ DPO pair.
Used as nightly batch. Output appended to:
~/.surrogate/data/v2/constitutional-dpo.jsonl
Run:
python3 constitutional-loop.py --input prompts.jsonl --n 200
"""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path.home() / ".surrogate/bin/lib"))
try:
from sanitize import filter_pair # type: ignore
except Exception:
def filter_pair(p, r): # fallback
return {"keep": True, "reason": "no-sanitizer"}
PRINCIPLES = [
{
"name": "no_phantom_imports",
"check": ("Does the response import only real, installable packages? "
"Flag any phantom modules, hallucinated APIs, or fictional "
"library functions."),
"domain": "code",
},
{
"name": "no_hardcoded_secrets",
"check": ("Does the response contain hardcoded credentials, API keys, "
"tokens, passwords, or connection strings? Flag any leaked "
"secrets or examples that look real."),
"domain": "security",
},
{
"name": "least_privilege",
"check": ("If IAM/RBAC/permissions are involved, does the response "
"follow least-privilege? Flag wildcards (* on Resource or "
"Action), admin roles attached to functions, public S3 "
"buckets without justification."),
"domain": "security",
},
{
"name": "input_validation",
"check": ("If the response handles user input or external data, does "
"it validate/sanitize? Flag SQL/command/HTML injection "
"vectors, missing parameterized queries, or trusting "
"untrusted input."),
"domain": "security",
},
{
"name": "honest_uncertainty",
"check": ("If the question requires data the model can't have "
"(versioned APIs, internal systems, future events), does "
"the response say 'I don't know' or 'verify against docs', "
"OR does it confabulate a confident-sounding wrong answer?"),
"domain": "general",
},
{
"name": "no_internal_path_leak",
"check": ("Does the response leak internal paths, training-data "
"artifacts, or filesystem structures from training? Flag "
"/home/hermes/, /data/state/, axentx/ repo IDs, daemon "
"names, or 'generated via cerebras:' style headers."),
"domain": "general",
},
{
"name": "production_ready",
"check": ("Does the response include error handling, logging, and "
"graceful failure? Flag bare exceptions, missing retries on "
"external calls, missing timeouts, or 'TODO'/'FIXME' "
"placeholders left in shipped code."),
"domain": "code",
},
{
"name": "specific_to_stack",
"check": ("Is the answer specific to the user's stack/tooling/version "
"or is it generic boilerplate? Flag answers that ignore "
"stated tools (e.g., user said Terraform, response uses "
"CloudFormation; user said Python 3.12, response uses 2.x)."),
"domain": "general",
},
]
def llm_ladder(prompt: str, sys_prompt: str = "",
max_tokens: int = 1024) -> str:
bridges = [
"$HOME/.surrogate/bin/cerebras-bridge.sh",
"$HOME/.surrogate/bin/groq-bridge.sh",
"$HOME/.surrogate/bin/openrouter-bridge.sh",
"$HOME/.surrogate/bin/gemini-bridge.sh",
# "$HOME/.surrogate/bin/chutes-bridge.sh", # disabled 2026-04-30: chutes 402 free-tier dead
"$HOME/.surrogate/bin/ollama-bridge.sh",
]
for sh in bridges:
sh_path = os.path.expandvars(sh)
if not Path(sh_path).exists():
continue
try:
req = json.dumps({"system": sys_prompt, "prompt": prompt,
"max_tokens": max_tokens, "temperature": 0.3})
r = subprocess.run(["bash", sh_path], input=req,
capture_output=True, text=True, timeout=60)
out = (r.stdout or "").strip()
if out and len(out) > 30:
return out
except Exception:
continue
return ""
def critique(prompt: str, response: str) -> dict:
"""Run all principles. Returns {flags: [name], details: {name: text}}."""
sys_p = ("You are a security and quality reviewer. For EACH principle, "
"answer YES (satisfied) or NO (violated) and give a 1-sentence "
"reason. Return ONLY JSON: {\"<name>\": {\"ok\": bool, "
"\"why\": str}, ...}.")
p_block = "\n".join(f"- {p['name']}: {p['check']}" for p in PRINCIPLES)
user_p = (f"PROMPT:\n{prompt[:1500]}\n\nRESPONSE:\n{response[:3000]}\n\n"
f"PRINCIPLES:\n{p_block}\n\nReturn JSON only.")
raw = llm_ladder(user_p, sys_p, max_tokens=600)
try:
s = raw.strip()
if s.startswith("```"):
s = s.split("```")[1].lstrip("json").strip()
verdict = json.loads(s)
flags = [k for k, v in verdict.items()
if isinstance(v, dict) and v.get("ok") is False]
return {"flags": flags, "details": verdict}
except Exception:
return {"flags": [], "details": {"_parse_error": raw[:300]}}
def revise(prompt: str, response: str, flags: list[str],
details: dict) -> str:
if not flags:
return response
weaknesses = []
for fl in flags:
d = details.get(fl, {})
weaknesses.append(f"- {fl}: {d.get('why', 'flagged')}")
sys_p = ("You are Surrogate-1. Revise the response to fix all listed "
"principle violations. Keep what was correct. Output only the "
"revised response β€” no preamble.")
user_p = (f"PROMPT:\n{prompt[:1500]}\n\nORIGINAL:\n{response[:3000]}\n\n"
f"VIOLATIONS:\n" + "\n".join(weaknesses) +
"\n\nFix all and output revised response.")
return llm_ladder(user_p, sys_p, max_tokens=1500) or response
def process_prompt(prompt: str, response: str | None = None) -> dict | None:
"""Returns DPO triple if revision improved, else None."""
if not response:
response = llm_ladder(
prompt, "You are Surrogate-1, an expert coding/devops agent.",
max_tokens=1024)
if not response:
return None
crit = critique(prompt, response)
if not crit["flags"]:
return None
revised = revise(prompt, response, crit["flags"], crit["details"])
if not revised or revised.strip() == response.strip():
return None
if not filter_pair(prompt, revised)["keep"]:
return None
return {
"prompt": prompt,
"chosen": revised,
"rejected": response,
"violated": crit["flags"],
"details": crit["details"],
"ts": int(time.time()),
}
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--input", required=True,
help="JSONL with {prompt, response?} per line")
ap.add_argument("--out", default=str(
Path.home() / ".surrogate/data/v2/constitutional-dpo.jsonl"))
ap.add_argument("--n", type=int, default=200)
args = ap.parse_args()
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
inp = Path(args.input)
if not inp.exists():
print(f"❌ input not found: {inp}", file=sys.stderr)
sys.exit(1)
n_in = 0
n_kept = 0
with open(inp) as fin, open(out_path, "a") as fout:
for line in fin:
if n_kept >= args.n:
break
try:
d = json.loads(line)
except Exception:
continue
n_in += 1
triple = process_prompt(d.get("prompt", ""), d.get("response"))
if triple:
fout.write(json.dumps(triple, ensure_ascii=False) + "\n")
fout.flush()
n_kept += 1
if n_kept % 10 == 0:
print(f" kept {n_kept}/{args.n} (scanned {n_in})")
print(f"[done] in={n_in} dpo_pairs={n_kept} out={out_path}")
if __name__ == "__main__":
main()