ashirato commited on
Commit
21c6a6e
·
1 Parent(s): 57578c8

fix(mirror): write to batches/mirror-merged with clean {prompt,response} schema only — drops source/ts extras + raw-mirrors/enriched paths that broke v1 training pyarrow cast

Browse files
Files changed (1) hide show
  1. bin/dataset-mirror.sh +19 -19
bin/dataset-mirror.sh CHANGED
@@ -30,8 +30,9 @@ For each big community dataset on the SOURCES list:
30
  1. Use huggingface_hub.snapshot_download to pull the parquet shards
31
  2. Stream-read each shard, FILTER for SDLC/coding relevance, DEDUP via central
32
  store, normalize to {prompt, response} schema
33
- 3. Buffer enriched rows and upload one parquet per source under
34
- enriched/<slug>/<shard>.parquet
 
35
  4. Stamp a marker so we don't re-mirror next cycle
36
 
37
  Why filter: user feedback — 'enrich เอาเฉพาะ dataset เรื่องที่เกี่ยวข้อง + dedup ทิ้ง'.
@@ -163,7 +164,7 @@ for src_id, slug in SOURCES:
163
  # Old stamp format (just timestamp) — also retry once with new extractor
164
  pass
165
  target = pick_repo(slug)
166
- print(f"\n▶ enrich+mirror {src_id} → {target}/enriched/{slug}/", flush=True)
167
  try:
168
  # Download parquet/jsonl shards
169
  local = snapshot_download(
@@ -178,16 +179,10 @@ for src_id, slug in SOURCES:
178
  import pyarrow as pa
179
  import pyarrow.parquet as pq
180
  except Exception:
181
- print(f" ❌ pyarrow not available falling back to raw mirror", flush=True)
182
- # last-resort raw upload
183
- for f in sorted(local_path.rglob("*.parquet")):
184
- if not f.is_file() or f.stat().st_size < 1024: continue
185
- api.upload_file(path_or_fileobj=str(f),
186
- path_in_repo=f"raw-mirrors/{slug}/{f.name}",
187
- repo_id=target, repo_type="dataset",
188
- commit_message=f"raw-mirror: {src_id} fallback")
189
- mirrored += 1
190
- time.sleep(2)
191
  continue
192
 
193
  scanned = kept = duped = irrelevant = 0
@@ -264,18 +259,22 @@ for src_id, slug in SOURCES:
264
  if HAS_DEDUP and not DedupStore.is_new(p, source=f"mirror-{slug}"):
265
  duped += 1
266
  continue
267
- out_rows.append({"prompt": p, "response": r,
268
- "source": f"mirror/{slug}", "ts": int(time.time())})
 
 
 
269
  kept += 1
270
 
271
  # Periodic flush — keeps memory bounded for huge sources
272
  if len(out_rows) >= 50000:
273
  chunk_path = CACHE / f"{slug}-chunk-{int(time.time())}.parquet"
274
  pq.write_table(pa.Table.from_pylist(out_rows), chunk_path, compression="snappy")
 
275
  api.upload_file(path_or_fileobj=str(chunk_path),
276
- path_in_repo=f"enriched/{slug}/chunk-{int(time.time())}.parquet",
277
  repo_id=target, repo_type="dataset",
278
- commit_message=f"enriched mirror: {src_id} +{len(out_rows)} rows")
279
  mirrored += 1
280
  out_rows = []
281
  chunk_path.unlink()
@@ -284,10 +283,11 @@ for src_id, slug in SOURCES:
284
  if out_rows:
285
  chunk_path = CACHE / f"{slug}-final-{int(time.time())}.parquet"
286
  pq.write_table(pa.Table.from_pylist(out_rows), chunk_path, compression="snappy")
 
287
  api.upload_file(path_or_fileobj=str(chunk_path),
288
- path_in_repo=f"enriched/{slug}/final-{int(time.time())}.parquet",
289
  repo_id=target, repo_type="dataset",
290
- commit_message=f"enriched mirror final: {src_id} +{len(out_rows)} rows")
291
  mirrored += 1
292
  chunk_path.unlink()
293
 
 
30
  1. Use huggingface_hub.snapshot_download to pull the parquet shards
31
  2. Stream-read each shard, FILTER for SDLC/coding relevance, DEDUP via central
32
  store, normalize to {prompt, response} schema
33
+ 3. Buffer normalized rows and upload one parquet per source under
34
+ batches/mirror-merged/<date>/<slug>-chunk-<ts>.parquet (clean
35
+ {prompt, response} schema only — no extra cols)
36
  4. Stamp a marker so we don't re-mirror next cycle
37
 
38
  Why filter: user feedback — 'enrich เอาเฉพาะ dataset เรื่องที่เกี่ยวข้อง + dedup ทิ้ง'.
 
164
  # Old stamp format (just timestamp) — also retry once with new extractor
165
  pass
166
  target = pick_repo(slug)
167
+ print(f"\n▶ enrich+mirror {src_id} → {target}/batches/mirror-merged/{slug}/", flush=True)
168
  try:
169
  # Download parquet/jsonl shards
170
  local = snapshot_download(
 
179
  import pyarrow as pa
180
  import pyarrow.parquet as pq
181
  except Exception:
182
+ # No pyarrow SKIP. Raw upload would inject mixed-schema parquet
183
+ # (with full source cols) that breaks training-time dataset loading.
184
+ # Better to lose this source for this cycle than corrupt schema again.
185
+ print(f" ⏭ pyarrow missing skip {src_id} (would write messy schema)", flush=True)
 
 
 
 
 
 
186
  continue
187
 
188
  scanned = kept = duped = irrelevant = 0
 
259
  if HAS_DEDUP and not DedupStore.is_new(p, source=f"mirror-{slug}"):
260
  duped += 1
261
  continue
262
+ # CLEAN SCHEMA: only {prompt, response}. Source attribution moves
263
+ # to filename (batches/mirror-merged/<slug>/...) so training-time
264
+ # consumers don't have to handle extra cols. This was the cause of
265
+ # the pyarrow CastError that blocked v1 training (2026-04-29).
266
+ out_rows.append({"prompt": p, "response": r})
267
  kept += 1
268
 
269
  # Periodic flush — keeps memory bounded for huge sources
270
  if len(out_rows) >= 50000:
271
  chunk_path = CACHE / f"{slug}-chunk-{int(time.time())}.parquet"
272
  pq.write_table(pa.Table.from_pylist(out_rows), chunk_path, compression="snappy")
273
+ date_tag = time.strftime("%Y-%m-%d")
274
  api.upload_file(path_or_fileobj=str(chunk_path),
275
+ path_in_repo=f"batches/mirror-merged/{date_tag}/{slug}-chunk-{int(time.time())}.parquet",
276
  repo_id=target, repo_type="dataset",
277
+ commit_message=f"clean mirror: {src_id} +{len(out_rows)} rows")
278
  mirrored += 1
279
  out_rows = []
280
  chunk_path.unlink()
 
283
  if out_rows:
284
  chunk_path = CACHE / f"{slug}-final-{int(time.time())}.parquet"
285
  pq.write_table(pa.Table.from_pylist(out_rows), chunk_path, compression="snappy")
286
+ date_tag = time.strftime("%Y-%m-%d")
287
  api.upload_file(path_or_fileobj=str(chunk_path),
288
+ path_in_repo=f"batches/mirror-merged/{date_tag}/{slug}-final-{int(time.time())}.parquet",
289
  repo_id=target, repo_type="dataset",
290
+ commit_message=f"clean mirror final: {src_id} +{len(out_rows)} rows")
291
  mirrored += 1
292
  chunk_path.unlink()
293