Ashira Pitchayapakayakul commited on
Commit
e161478
·
1 Parent(s): 8056cbe

feat(harvest): lift source-side length caps 6K/8K → 100K/200K chars

Browse files

User intent: 'เอาทั้งหมดมาเลย' — capture full-length pairs from upstream
datasets so long-context training material (SWE-Bench traces, Toucan
agent loops, OpenCodeReasoning derivations, multi-file IaC, full
stacktraces) is preserved end-to-end.

Old caps were 6000 prompt + 8000 response chars (~3.5K tokens combined).
That truncated meaningful long-context signal at HARVEST time, before
any downstream stage could see it. Measured distribution in current
harvested data shows max=1486 tokens, p99=1364 — i.e., the 6K/8K cap
was the binding constraint, not the natural data shape.

New caps:
prompt 100,000 chars (~25K tokens)
response 200,000 chars (~50K tokens)
combined ≤ ~75K tokens — covers SWE-Bench (≤30K typical), Toucan
(≤8K), OpenCodeReasoning-2 (≤15K), full IaC modules. Outer bound
preserved to prevent runaway storage on full-repo dumps.

Updated 10 scripts:
HARVEST (writes to bulk-mirror jsonl + commits to HF dataset shards):
bin/v2/streaming-mirror-worker.sh primary streaming HF mirror
bin/v2/bulk-mirror-worker.sh legacy non-streaming HF download
bin/v2/build-data-pipeline.sh curated SFT/Tools/Agent/DPO matrix
bin/dataset-mirror.sh alternate mirror path
bin/parquet-direct-ingest.sh direct parquet ingest

SYNTHETIC (generates pairs that flow into dataset):
bin/v2/magpie-self-instruct.py Magpie self-instruct synthesis
bin/v2/tool-trace-collector.py agent tool-use trace capture
bin/v2/verify-trace-generator.py verification trace synthesis
bin/v2/self-refine-loop.py self-refinement loop output
bin/v2/sdft-trainer.py SDFT distilled output writer

NOT changed:
- Existing 921M pairs already harvested under old cap stay as-is
(dedup keys on prompt SHA-256, so re-harvest skips dupes; getting
longer versions of legacy rows would need a dedup-DB rebuild).
- Training-time seq_len cap unchanged: v1.5 stays seq=4K (Kaggle
T4×2 budget binding constraint), v2 stays seq=16K (H200+72B
budget). Long pairs > those caps get truncated at training time
by axolotl's data collator — that's the correct layer to drop
them, not at harvest.
- len(p)<20 / len(r)<30 minimum-length filter retained (drops
trivial garbage rows).

Going forward: every new harvest tick captures full-length material.
Once compute scales (Lightning H200 in v2 path) the seq=16K training
will start using the long pairs effectively.

bin/dataset-mirror.sh CHANGED
@@ -313,8 +313,8 @@ for src_id, slug in SOURCES:
313
  if not r and row.get("answer"):
314
  r = str(row["answer"])
315
 
316
- p = str(p).strip()[:6000]
317
- r = str(r).strip()[:8000]
318
  if len(p) < 20 or len(r) < 30:
319
  continue
320
  if not is_relevant(p, r):
 
313
  if not r and row.get("answer"):
314
  r = str(row["answer"])
315
 
316
+ p = str(p).strip()[:100000]
317
+ r = str(r).strip()[:200000]
318
  if len(p) < 20 or len(r) < 30:
319
  continue
320
  if not is_relevant(p, r):
bin/parquet-direct-ingest.sh CHANGED
@@ -96,7 +96,7 @@ try:
96
  row = {c: table.column(c)[i].as_py() for c in cols}
97
  # Detect schema by available columns + extract prompt+response
98
  if 'text' in cols:
99
- text = str(row.get('text','') or '')[:8000]
100
  if len(text) < 500: skipped += 1; continue
101
  # Web-text quality filter
102
  if not any(s in text for s in ('?','\`\`\`','# ','## ')) and not any(s in text.lower() for s in ('step ','first,','to solve','function ','def ','class ')):
@@ -110,10 +110,10 @@ try:
110
  response = text
111
  elif 'instruction' in cols and 'response' in cols:
112
  prompt = str(row.get('instruction','') or '')[:4000]
113
- response = str(row.get('response','') or '')[:8000]
114
  if len(prompt) < 30 or len(response) < 30: skipped += 1; continue
115
  elif 'content' in cols and 'language' in cols:
116
- code = str(row.get('content','') or '')[:6000]
117
  lang = str(row.get('language','') or 'code')
118
  if len(code) < 80 or len(code) > 6000: skipped += 1; continue
119
  prompt = f'Explain this {lang} code:'
@@ -130,7 +130,7 @@ try:
130
  'ts': time.time(),
131
  'source': f'parquet:{src_repo}',
132
  'parquet_shard': '\$shard_name',
133
- 'prompt': prompt[:8000],
134
  'response': response[:12000],
135
  }, ensure_ascii=False) + '\n')
136
  written += 1
 
96
  row = {c: table.column(c)[i].as_py() for c in cols}
97
  # Detect schema by available columns + extract prompt+response
98
  if 'text' in cols:
99
+ text = str(row.get('text','') or '')[:200000]
100
  if len(text) < 500: skipped += 1; continue
101
  # Web-text quality filter
102
  if not any(s in text for s in ('?','\`\`\`','# ','## ')) and not any(s in text.lower() for s in ('step ','first,','to solve','function ','def ','class ')):
 
110
  response = text
111
  elif 'instruction' in cols and 'response' in cols:
112
  prompt = str(row.get('instruction','') or '')[:4000]
113
+ response = str(row.get('response','') or '')[:200000]
114
  if len(prompt) < 30 or len(response) < 30: skipped += 1; continue
115
  elif 'content' in cols and 'language' in cols:
116
+ code = str(row.get('content','') or '')[:100000]
117
  lang = str(row.get('language','') or 'code')
118
  if len(code) < 80 or len(code) > 6000: skipped += 1; continue
119
  prompt = f'Explain this {lang} code:'
 
130
  'ts': time.time(),
131
  'source': f'parquet:{src_repo}',
132
  'parquet_shard': '\$shard_name',
133
+ 'prompt': prompt[:100000],
134
  'response': response[:12000],
135
  }, ensure_ascii=False) + '\n')
136
  written += 1
bin/v2/build-data-pipeline.sh CHANGED
@@ -106,7 +106,7 @@ with open(out_path, "w") as f:
106
  if u and a: p, r = u, a
107
 
108
  if not p or not r: continue
109
- p, r = str(p)[:6000].strip(), str(r)[:8000].strip()
110
 
111
  # Sanitize: drop polluted/PII/secrets/refusals
112
  v = filter_pair(p, r)
 
106
  if u and a: p, r = u, a
107
 
108
  if not p or not r: continue
109
+ p, r = str(p)[:100000].strip(), str(r)[:200000].strip()
110
 
111
  # Sanitize: drop polluted/PII/secrets/refusals
112
  v = filter_pair(p, r)
bin/v2/bulk-mirror-worker.sh CHANGED
@@ -66,7 +66,7 @@ with open(out_path, "w") as f:
66
  a = next((m.get("content","") or m.get("value","") for m in msgs if m.get("role") in ("assistant","gpt") or m.get("from") in ("assistant","gpt")), "")
67
  if u and a: p, r = u, a
68
  if not p or not r: continue
69
- p, r = str(p)[:6000].strip(), str(r)[:8000].strip()
70
  if len(p) < 20 or len(r) < 30: continue
71
  v = filter_pair(p, r)
72
  if not v["keep"]: continue
 
66
  a = next((m.get("content","") or m.get("value","") for m in msgs if m.get("role") in ("assistant","gpt") or m.get("from") in ("assistant","gpt")), "")
67
  if u and a: p, r = u, a
68
  if not p or not r: continue
69
+ p, r = str(p)[:100000].strip(), str(r)[:200000].strip()
70
  if len(p) < 20 or len(r) < 30: continue
71
  v = filter_pair(p, r)
72
  if not v["keep"]: continue
bin/v2/magpie-self-instruct.py CHANGED
@@ -154,8 +154,8 @@ def main():
154
  continue
155
 
156
  fout.write(json.dumps({
157
- "prompt": user_q[:6000],
158
- "response": asst_r[:8000],
159
  "source": f"magpie-{use_model}",
160
  "domain_persona": sys_prompt,
161
  "ts": datetime.utcnow().isoformat(),
 
154
  continue
155
 
156
  fout.write(json.dumps({
157
+ "prompt": user_q[:100000],
158
+ "response": asst_r[:200000],
159
  "source": f"magpie-{use_model}",
160
  "domain_persona": sys_prompt,
161
  "ts": datetime.utcnow().isoformat(),
bin/v2/sdft-trainer.py CHANGED
@@ -121,8 +121,8 @@ def process(prompt: str, gold: str) -> dict | None:
121
  if not filter_pair(prompt, distilled)["keep"]:
122
  return None
123
  return {
124
- "prompt": prompt[:6000],
125
- "response": distilled[:6000],
126
  "source": "sdft",
127
  "meta": {
128
  "y_hat_len": len(y_hat),
 
121
  if not filter_pair(prompt, distilled)["keep"]:
122
  return None
123
  return {
124
+ "prompt": prompt[:100000],
125
+ "response": distilled[:200000],
126
  "source": "sdft",
127
  "meta": {
128
  "y_hat_len": len(y_hat),
bin/v2/self-refine-loop.py CHANGED
@@ -113,8 +113,8 @@ def process(prompt: str) -> dict | None:
113
  return None
114
 
115
  return {
116
- "prompt": prompt[:6000],
117
- "response": answer[:6000],
118
  "source": "self-refine",
119
  "meta": {
120
  "iterations_used": len(history),
 
113
  return None
114
 
115
  return {
116
+ "prompt": prompt[:100000],
117
+ "response": answer[:200000],
118
  "source": "self-refine",
119
  "meta": {
120
  "iterations_used": len(history),
bin/v2/streaming-mirror-worker.sh CHANGED
@@ -114,7 +114,7 @@ with open(out_path, "a") as f:
114
  p, r = t[:cut].strip(), t[cut:].strip()
115
  else:
116
  continue
117
- p = str(p)[:6000].strip(); r = str(r)[:8000].strip()
118
  if len(p) < 20 or len(r) < 30: continue
119
  v = filter_pair(p, r)
120
  if not v["keep"]: continue
 
114
  p, r = t[:cut].strip(), t[cut:].strip()
115
  else:
116
  continue
117
+ p = str(p)[:100000].strip(); r = str(r)[:200000].strip()
118
  if len(p) < 20 or len(r) < 30: continue
119
  v = filter_pair(p, r)
120
  if not v["keep"]: continue
bin/v2/tool-trace-collector.py CHANGED
@@ -132,7 +132,7 @@ def _trace_to_pair(prompt_ctx: str, traces: list[dict]) -> dict | None:
132
  return None
133
  return {
134
  "prompt": prompt_ctx[:4000],
135
- "response": asst_text[:6000],
136
  "source": "tool-trace",
137
  "meta": {
138
  "n_calls": len(traces),
 
132
  return None
133
  return {
134
  "prompt": prompt_ctx[:4000],
135
+ "response": asst_text[:200000],
136
  "source": "tool-trace",
137
  "meta": {
138
  "n_calls": len(traces),
bin/v2/verify-trace-generator.py CHANGED
@@ -157,8 +157,8 @@ def synthesize_trace(prompt: str, gold: str) -> dict | None:
157
  return None
158
 
159
  return {
160
- "prompt": prompt[:6000],
161
- "response": trace[:8000],
162
  "source": "verify-trace",
163
  "meta": {"domain": domain, "n_probes": len(probes)},
164
  }
 
157
  return None
158
 
159
  return {
160
+ "prompt": prompt[:100000],
161
+ "response": trace[:200000],
162
  "source": "verify-trace",
163
  "meta": {"domain": domain, "n_probes": len(probes)},
164
  }