Spaces:
Runtime error
Runtime error
Ashira Pitchayapakayakul
v11(into-model): add 9 ingest datasets + Phase 0 hygiene + TruthRL ternary GRPO
a71a56a | #!/usr/bin/env python3 | |
| """Extract real engineer↔assistant turns from ~/.claude/projects/*.jsonl | |
| session files into chunked text suitable for the distiller. | |
| Each Claude Code session = a JSONL of message events. We extract pairs of | |
| (user-text, assistant-text) where the assistant gave a substantive technical | |
| answer. Output: text chunks (one per file) ready to pipe to distiller.py. | |
| Skip: | |
| - tool-use messages (we want the engineer↔expert dialogue) | |
| - very short turns (<50 chars) | |
| - turns where assistant just acknowledged or asked clarification | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| from pathlib import Path | |
| PROJECTS = Path.home() / ".claude/projects" | |
| def extract_pairs_from_session(jsonl_path: Path) -> list[tuple[str, str]]: | |
| """Return list of (user_msg, assistant_response) pairs.""" | |
| if not jsonl_path.exists(): | |
| return [] | |
| pairs = [] | |
| last_user = None | |
| for L in jsonl_path.read_text(errors="replace").splitlines(): | |
| try: | |
| ev = json.loads(L) | |
| except Exception: | |
| continue | |
| msg = ev.get("message", {}) | |
| role = msg.get("role") | |
| content = msg.get("content", "") | |
| # Normalise content (sometimes it's a list of {type:text, text:..} blocks) | |
| if isinstance(content, list): | |
| text_parts = [b.get("text", "") for b in content | |
| if isinstance(b, dict) and b.get("type") == "text"] | |
| content = "\n".join(text_parts) | |
| if not isinstance(content, str): | |
| content = str(content) | |
| if len(content) < 50: | |
| continue | |
| if role == "user": | |
| last_user = content | |
| elif role == "assistant" and last_user: | |
| pairs.append((last_user, content)) | |
| last_user = None | |
| return pairs | |
| def session_to_text(jsonl_path: Path, max_pair_chars: int = 4000) -> str: | |
| """Format a session's pairs as a single text blob for the distiller.""" | |
| pairs = extract_pairs_from_session(jsonl_path) | |
| if not pairs: | |
| return "" | |
| out = [f"# Session: {jsonl_path.stem}", ""] | |
| for u, a in pairs[:50]: # cap per session | |
| u_clip = u[:max_pair_chars // 2] | |
| a_clip = a[:max_pair_chars // 2] | |
| out.append(f"## User\n{u_clip}\n\n## Assistant\n{a_clip}\n") | |
| return "\n".join(out) | |
| def main() -> int: | |
| p = argparse.ArgumentParser() | |
| p.add_argument("--out-dir", default=str(Path.home() / ".surrogate/state/v10-ingest/conversations/text")) | |
| p.add_argument("--limit", type=int, default=0, | |
| help="cap on number of session files (0 = all)") | |
| p.add_argument("--min-pairs", type=int, default=2, | |
| help="skip sessions with fewer than N user-assistant pairs") | |
| args = p.parse_args() | |
| out = Path(args.out_dir) | |
| out.mkdir(parents=True, exist_ok=True) | |
| files = sorted(PROJECTS.rglob("*.jsonl")) | |
| if args.limit: | |
| files = files[:args.limit] | |
| print(f"scanning {len(files)} session files in {PROJECTS}") | |
| n_written = 0 | |
| for fp in files: | |
| try: | |
| pairs = extract_pairs_from_session(fp) | |
| except Exception as e: | |
| sys.stderr.write(f" skip {fp.name}: {e}\n") | |
| continue | |
| if len(pairs) < args.min_pairs: | |
| continue | |
| txt = session_to_text(fp) | |
| if not txt: | |
| continue | |
| out_file = out / (fp.parent.name + "__" + fp.stem + ".md") | |
| out_file.write_text(txt) | |
| n_written += 1 | |
| if n_written % 20 == 0: | |
| print(f" wrote {n_written} session texts ({len(txt)} chars latest)") | |
| print(f"DONE: wrote {n_written} session texts to {out}") | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |