surrogate-1 / bin /v3 /conversation-extractor.py
Ashira Pitchayapakayakul
v11(into-model): add 9 ingest datasets + Phase 0 hygiene + TruthRL ternary GRPO
a71a56a
#!/usr/bin/env python3
"""Extract real engineer↔assistant turns from ~/.claude/projects/*.jsonl
session files into chunked text suitable for the distiller.
Each Claude Code session = a JSONL of message events. We extract pairs of
(user-text, assistant-text) where the assistant gave a substantive technical
answer. Output: text chunks (one per file) ready to pipe to distiller.py.
Skip:
- tool-use messages (we want the engineer↔expert dialogue)
- very short turns (<50 chars)
- turns where assistant just acknowledged or asked clarification
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
PROJECTS = Path.home() / ".claude/projects"
def extract_pairs_from_session(jsonl_path: Path) -> list[tuple[str, str]]:
"""Return list of (user_msg, assistant_response) pairs."""
if not jsonl_path.exists():
return []
pairs = []
last_user = None
for L in jsonl_path.read_text(errors="replace").splitlines():
try:
ev = json.loads(L)
except Exception:
continue
msg = ev.get("message", {})
role = msg.get("role")
content = msg.get("content", "")
# Normalise content (sometimes it's a list of {type:text, text:..} blocks)
if isinstance(content, list):
text_parts = [b.get("text", "") for b in content
if isinstance(b, dict) and b.get("type") == "text"]
content = "\n".join(text_parts)
if not isinstance(content, str):
content = str(content)
if len(content) < 50:
continue
if role == "user":
last_user = content
elif role == "assistant" and last_user:
pairs.append((last_user, content))
last_user = None
return pairs
def session_to_text(jsonl_path: Path, max_pair_chars: int = 4000) -> str:
"""Format a session's pairs as a single text blob for the distiller."""
pairs = extract_pairs_from_session(jsonl_path)
if not pairs:
return ""
out = [f"# Session: {jsonl_path.stem}", ""]
for u, a in pairs[:50]: # cap per session
u_clip = u[:max_pair_chars // 2]
a_clip = a[:max_pair_chars // 2]
out.append(f"## User\n{u_clip}\n\n## Assistant\n{a_clip}\n")
return "\n".join(out)
def main() -> int:
p = argparse.ArgumentParser()
p.add_argument("--out-dir", default=str(Path.home() / ".surrogate/state/v10-ingest/conversations/text"))
p.add_argument("--limit", type=int, default=0,
help="cap on number of session files (0 = all)")
p.add_argument("--min-pairs", type=int, default=2,
help="skip sessions with fewer than N user-assistant pairs")
args = p.parse_args()
out = Path(args.out_dir)
out.mkdir(parents=True, exist_ok=True)
files = sorted(PROJECTS.rglob("*.jsonl"))
if args.limit:
files = files[:args.limit]
print(f"scanning {len(files)} session files in {PROJECTS}")
n_written = 0
for fp in files:
try:
pairs = extract_pairs_from_session(fp)
except Exception as e:
sys.stderr.write(f" skip {fp.name}: {e}\n")
continue
if len(pairs) < args.min_pairs:
continue
txt = session_to_text(fp)
if not txt:
continue
out_file = out / (fp.parent.name + "__" + fp.stem + ".md")
out_file.write_text(txt)
n_written += 1
if n_written % 20 == 0:
print(f" wrote {n_written} session texts ({len(txt)} chars latest)")
print(f"DONE: wrote {n_written} session texts to {out}")
return 0
if __name__ == "__main__":
sys.exit(main())