bochen2079/katherine-k0 / logs /prep_dataset.py
bochen2079's picture
download
raw
6.41 kB
"""
prep_dataset.py — build canonical k0 datasets from raw Katherine JSONL exports.
Input: C:\\Katherine\\JSONLs\\*.jsonl (~38 files, ~6,164 lines, mixed schema)
Output: data/k0_canonical.jsonl — SFT examples, deduped, system-prompts stripped
data/k0_dpo_curated.jsonl — DPO pairs from k0_dpo_only.jsonl, prompts system-stripped
Why strip system prompts:
See CLOUD.md § "Why unconditional Katherine". Short version: training with
the K0 system prompt teaches `P(K_output | sysprompt_K)` — the model learns
to be Katherine WHEN PROMPTED. Stripping makes it `P(K_output | nothing)` —
the model unconditionally IS Katherine. Robust against sysprompt-removal,
more honest as a persona deployment.
Run:
python prep_dataset.py
python prep_dataset.py --src /path/to/JSONLs --out-dir data/
Idempotent: re-running overwrites the canonical files atomically.
"""
import argparse
import hashlib
import json
import os
import sys
from pathlib import Path
def load_unique_sft(src_dir: Path, exclude_files: set) -> dict:
"""Load all SFT examples (have 'messages' key), dedupe by content hash."""
unique = {}
for f in sorted(src_dir.glob("*.jsonl")):
if f.name in exclude_files:
continue
with open(f, "r", encoding="utf-8", errors="replace") as fp:
for line in fp:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if "messages" not in obj or not obj.get("messages"):
continue
msgs_str = json.dumps(obj["messages"], sort_keys=True, ensure_ascii=False)
h = hashlib.md5(msgs_str.encode()).hexdigest()
unique[h] = obj["messages"]
return unique
def strip_system_messages(messages: list) -> list:
"""Drop all role=system entries. Return remaining user/assistant chain."""
return [m for m in messages if m.get("role") != "system"]
def is_valid_sft(messages: list) -> bool:
"""Must start with user, contain at least one assistant turn."""
if not messages:
return False
if messages[0].get("role") != "user":
return False
if not any(m.get("role") == "assistant" for m in messages):
return False
return True
def write_jsonl_atomic(path: Path, items: list) -> None:
"""Write JSONL via tmp+rename so partial writes never appear at the final path."""
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as fp:
for item in items:
fp.write(json.dumps(item, ensure_ascii=False) + "\n")
tmp.replace(path)
def build_dpo_curated(src_file: Path) -> list:
"""Load k0_dpo_only.jsonl, strip system from prompts, return preference pairs."""
out = []
with open(src_file, "r", encoding="utf-8", errors="replace") as fp:
for line in fp:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
prompt = obj.get("prompt", [])
chosen = obj.get("chosen", [])
rejected = obj.get("rejected", [])
if not (prompt and chosen and rejected):
continue
prompt_stripped = strip_system_messages(prompt)
if not prompt_stripped:
continue
out.append({
"prompt": prompt_stripped,
"chosen": chosen,
"rejected": rejected,
})
return out
def main():
p = argparse.ArgumentParser()
p.add_argument("--src", default=r"C:\Katherine\JSONLs",
help="Source directory of raw Katherine JSONL files.")
p.add_argument("--out-dir", default="data",
help="Output directory for canonical datasets.")
p.add_argument("--dpo-source-file", default="k0_dpo_only.jsonl",
help="Filename within --src that holds the curated DPO pairs.")
args = p.parse_args()
src_dir = Path(args.src)
if not src_dir.is_dir():
print(f"[ERROR] source directory not found: {src_dir}", file=sys.stderr)
sys.exit(1)
out_dir = Path(args.out_dir)
# === SFT ===
print(f"[sft] scanning {src_dir} (excluding {args.dpo_source_file})")
sft_unique = load_unique_sft(src_dir, exclude_files={args.dpo_source_file})
print(f"[sft] {len(sft_unique)} unique SFT examples after dedup")
sft_stripped = []
for h, msgs in sft_unique.items():
clean = strip_system_messages(msgs)
if is_valid_sft(clean):
sft_stripped.append({"messages": clean})
print(f"[sft] {len(sft_stripped)} valid examples after stripping system prompts")
out_sft = out_dir / "k0_canonical.jsonl"
write_jsonl_atomic(out_sft, sft_stripped)
print(f"[sft] wrote {out_sft} ({os.path.getsize(out_sft)} bytes)")
# === DPO ===
dpo_src_file = src_dir / args.dpo_source_file
if dpo_src_file.is_file():
dpo_pairs = build_dpo_curated(dpo_src_file)
print(f"[dpo] {len(dpo_pairs)} curated DPO pairs after stripping system from prompts")
out_dpo = out_dir / "k0_dpo_curated.jsonl"
write_jsonl_atomic(out_dpo, dpo_pairs)
print(f"[dpo] wrote {out_dpo} ({os.path.getsize(out_dpo)} bytes)")
else:
print(f"[dpo] WARN: {dpo_src_file} not found; skipping DPO build")
# === Stats ===
turn_counts = {}
char_lens = []
for ex in sft_stripped:
n = len(ex["messages"])
turn_counts[n] = turn_counts.get(n, 0) + 1
char_lens.append(sum(len(m.get("content", "")) for m in ex["messages"]))
char_lens.sort()
nn = len(char_lens)
print()
print("=== SFT corpus stats ===")
for n in sorted(turn_counts.keys()):
print(f" {n}-turn: {turn_counts[n]}")
print(f" char-len p50: {char_lens[nn//2]}")
print(f" char-len p99: {char_lens[int(nn*0.99)]}")
print(f" approx tokens (chars/4) p50: {char_lens[nn//2]//4}")
print(f" approx tokens (chars/4) p99: {char_lens[int(nn*0.99)]//4}")
print()
print("[done] datasets ready under", out_dir)
if __name__ == "__main__":
main()

Xet Storage Details

Size:
6.41 kB
·
Xet hash:
00780b4682578e0da8140c567fcff480a4682389c4f3fe2810d289c8afbb25ec

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.