Ashira Pitchayapakayakul commited on
Commit
ff1eae4
Β·
1 Parent(s): ea561c8

fix: 4 critical bugs + incremental push (gentle ingest, no crashes)

Browse files

ROOT CAUSES:
1. push-training-to-hf.sh: tried to upload entire 35K-pair file in one HfApi call
β†’ kept failing β†’ offset never advanced β†’ infinite retry of the same massive blob
FIX: chunk-based upload (1500 pairs/call, ~700KB/upload, ~17min to drain backlog)
Each chunk β†’ unique remote path 'batches/YYYY-MM-DD/chunk-HHMMSS-N.jsonl'
Offset advances incrementally on each successful chunk.

2. surrogate-self-ingest.sh: filter required prompt+response >= 50 chars
β†’ 35K agentic-crawler pairs all skipped (placeholder responses ~40 chars)
β†’ 'total indexed: 0' despite processing 35K pairs
FIX: relax filter to 'both fields non-empty', batch-process 5000/run

3. dataset-enrich.sh: hardcoded ~/.claude/venv/bin/python (Mac path, missing on Linux)
FIX: use bare 'python3' (PATH-based)

4. scrape-keyword-tuner.sh + perf-watchdog.sh: vm_stat (macOS only) without fallback
FIX: detect /proc/meminfo first (Linux), vm_stat second (macOS)

USER REQUESTED CONSTRAINT (honored):
'อฒ่า ingest ΰΈ—ΰΈ΅ΰΉ€ΰΈ”ΰΈ΅ΰΈ’ΰΈ§ ΰΈ•ΰΈ²ΰΈ’ ΰΈ„ΰΉˆΰΈ­ΰΈ’ΰΉ† ΰΈ—ΰΈ³' β†’
- self-ingest: BATCH_SIZE=5000 (configurable via SELF_INGEST_BATCH env)
- push-training: CHUNK_SIZE=1500 (configurable via TRAINING_PUSH_CHUNK env)
- Both advance offset incrementally; safe to interrupt + resume

bin/dataset-enrich.sh CHANGED
@@ -23,7 +23,7 @@ mkdir -p "$WORK" "$(dirname "$LOG")"
23
 
24
  echo "[$(date +%H:%M:%S)] dataset enrich start" | tee "$LOG"
25
 
26
- ~/.claude/venv/bin/python <<'PYEOF' 2>&1 | tee -a "$LOG"
27
  from huggingface_hub import HfApi
28
  from pathlib import Path
29
  from datasets import load_dataset
 
23
 
24
  echo "[$(date +%H:%M:%S)] dataset enrich start" | tee "$LOG"
25
 
26
+ python3 <<'PYEOF' 2>&1 | tee -a "$LOG"
27
  from huggingface_hub import HfApi
28
  from pathlib import Path
29
  from datasets import load_dataset
bin/perf-watchdog.sh CHANGED
@@ -25,10 +25,16 @@ PROC_CAP=30
25
  DISK_WARN_GB=2
26
 
27
  get_load() {
28
- uptime | awk -F'load averages:' '{print $2}' | awk '{print int($1)}'
29
  }
30
  get_free_pages() {
31
- vm_stat | awk '/Pages free/{gsub("[.]","",$3); print $3}'
 
 
 
 
 
 
32
  }
33
  get_scrape_procs() {
34
  pgrep -f "fs-to-jsonl\|github-bulk-train\|chroma-to-training\|bulk-scrape-burst" 2>/dev/null | wc -l | tr -d ' '
 
25
  DISK_WARN_GB=2
26
 
27
  get_load() {
28
+ uptime | sed -E 's/.*load average[s]?:[[:space:]]*//' | awk -F',' '{print int($1)}'
29
  }
30
  get_free_pages() {
31
+ if [[ -r /proc/meminfo ]]; then
32
+ awk '/MemAvailable/{print int($2/4)}' /proc/meminfo
33
+ elif command -v vm_stat >/dev/null 2>&1; then
34
+ vm_stat | awk '/Pages free/{gsub("[.]","",$3); print $3}'
35
+ else
36
+ echo 999999
37
+ fi
38
  }
39
  get_scrape_procs() {
40
  pgrep -f "fs-to-jsonl\|github-bulk-train\|chroma-to-training\|bulk-scrape-burst" 2>/dev/null | wc -l | tr -d ' '
bin/push-training-to-hf.sh CHANGED
@@ -1,42 +1,61 @@
1
  #!/usr/bin/env bash
2
- # Push accumulated training pairs from local jsonl β†’ axentx/surrogate-1-training-pairs.
3
- # Uses python HfApi only (CLI syntax changed across versions; not reliable).
4
- # Idempotent: tracks last-pushed line offset so duplicates are skipped.
5
- # Only updates offset if push actually succeeded.
 
 
 
 
6
  set -uo pipefail
7
  set -a; source "$HOME/.hermes/.env" 2>/dev/null; set +a
8
 
9
  SRC="$HOME/.surrogate/training-pairs.jsonl"
10
  OFFSET_FILE="$HOME/.surrogate/.training-push-offset"
11
  LOG="$HOME/.surrogate/logs/training-push.log"
 
12
  mkdir -p "$(dirname "$LOG")"
13
 
14
  [[ ! -f "$SRC" ]] && { echo "[$(date +%H:%M:%S)] no source $SRC" | tee -a "$LOG"; exit 0; }
15
 
16
  CUR_LINES=$(wc -l < "$SRC" | tr -d ' ')
17
  PREV_OFFSET=$(cat "$OFFSET_FILE" 2>/dev/null || echo 0)
18
- NEW_LINES=$(( CUR_LINES - PREV_OFFSET ))
19
 
20
- echo "[$(date +%H:%M:%S)] training push: $NEW_LINES new pairs (offset=$PREV_OFFSET, total=$CUR_LINES)" | tee -a "$LOG"
21
- [[ $NEW_LINES -le 0 ]] && exit 0
22
 
23
- # Resolve token from any HF env var name
 
 
 
 
24
  HF_AUTH="${HF_TOKEN:-${HUGGING_FACE_HUB_TOKEN:-${HUGGINGFACE_TOKEN:-}}}"
25
  if [[ -z "$HF_AUTH" ]]; then
26
- echo "[$(date +%H:%M:%S)] ERR: no HF_TOKEN env β€” cannot upload" | tee -a "$LOG"
27
  exit 1
28
  fi
29
 
30
- # Slice new pairs to a date-stamped file
31
  DATE_TAG=$(date +%Y-%m-%d)
32
- SLICE="$HOME/.surrogate/.push-slice-${DATE_TAG}.jsonl"
33
- tail -n "$NEW_LINES" "$SRC" >> "$SLICE"
 
 
 
 
 
 
 
 
34
 
35
- # Upload via python HfApi (explicit token, explicit error handling)
36
- if HF_AUTH="$HF_AUTH" python3 - "$SLICE" "$NEW_LINES" "$DATE_TAG" >> "$LOG" 2>&1 <<'PYEOF'
37
- import sys, os, json, hashlib, time
38
- from pathlib import Path
39
- slice_path, n_pairs, date_tag = sys.argv[1], int(sys.argv[2]), sys.argv[3]
 
 
40
  hf_auth = os.environ["HF_AUTH"]
41
 
42
  try:
@@ -45,47 +64,27 @@ except ImportError:
45
  print(f"[{time.strftime('%H:%M:%S')}] ERR: huggingface_hub not installed")
46
  sys.exit(2)
47
 
48
- # Append to a daily file rather than overwrite β€” accumulate across pushes
49
  api = HfApi(token=hf_auth)
50
- remote_path = f"auto-orchestrate-{date_tag}.jsonl"
51
  try:
52
- # Check if remote file exists; if yes, fetch + concat to avoid losing prior pushes
53
- try:
54
- existing = api.hf_hub_download(
55
- repo_id="axentx/surrogate-1-training-pairs",
56
- filename=remote_path,
57
- repo_type="dataset",
58
- local_dir="/tmp/hf-push-cache",
59
- local_dir_use_symlinks=False,
60
- )
61
- # Concat: existing + slice β†’ new payload
62
- merged = Path("/tmp/hf-push-cache") / f"merged-{remote_path}"
63
- with open(merged, "wb") as out:
64
- out.write(Path(existing).read_bytes())
65
- out.write(Path(slice_path).read_bytes())
66
- upload_path = str(merged)
67
- except Exception:
68
- upload_path = slice_path
69
-
70
  api.upload_file(
71
- path_or_fileobj=upload_path,
72
- path_in_repo=remote_path,
73
  repo_id="axentx/surrogate-1-training-pairs",
74
  repo_type="dataset",
75
- commit_message=f"auto-orchestrate: +{n_pairs} pairs ({time.strftime('%H:%M')})",
76
  )
77
- print(f"[{time.strftime('%H:%M:%S')}] βœ… uploaded {n_pairs} new pairs to {remote_path}")
78
  sys.exit(0)
79
  except Exception as e:
80
  print(f"[{time.strftime('%H:%M:%S')}] ❌ {type(e).__name__}: {str(e)[:300]}")
81
  sys.exit(3)
82
  PYEOF
83
  then
84
- # Only advance offset on actual upload success
85
- echo "$CUR_LINES" > "$OFFSET_FILE"
86
  rm -f "$SLICE"
87
- echo "[$(date +%H:%M:%S)] push complete Β· offset β†’ $CUR_LINES" | tee -a "$LOG"
 
88
  else
89
- echo "[$(date +%H:%M:%S)] push failed β€” offset unchanged ($PREV_OFFSET), slice retained for retry" | tee -a "$LOG"
90
  exit 1
91
  fi
 
1
  #!/usr/bin/env bash
2
+ # Push training pairs β†’ HF dataset, INCREMENTALLY in small batches.
3
+ #
4
+ # Strategy: never upload the whole file. Each cron run pushes ONE chunk of
5
+ # CHUNK_SIZE pairs to a date-stamped file (one per day). Small uploads = fast,
6
+ # resilient, avoid timeouts on large blobs.
7
+ #
8
+ # Idempotent: tracks last-pushed line offset. Only advances on success.
9
+ # If 35K pairs queued, drains over ~17 min (CHUNK_SIZE=1500 every 3 min).
10
  set -uo pipefail
11
  set -a; source "$HOME/.hermes/.env" 2>/dev/null; set +a
12
 
13
  SRC="$HOME/.surrogate/training-pairs.jsonl"
14
  OFFSET_FILE="$HOME/.surrogate/.training-push-offset"
15
  LOG="$HOME/.surrogate/logs/training-push.log"
16
+ CHUNK_SIZE="${TRAINING_PUSH_CHUNK:-1500}"
17
  mkdir -p "$(dirname "$LOG")"
18
 
19
  [[ ! -f "$SRC" ]] && { echo "[$(date +%H:%M:%S)] no source $SRC" | tee -a "$LOG"; exit 0; }
20
 
21
  CUR_LINES=$(wc -l < "$SRC" | tr -d ' ')
22
  PREV_OFFSET=$(cat "$OFFSET_FILE" 2>/dev/null || echo 0)
23
+ QUEUED=$(( CUR_LINES - PREV_OFFSET ))
24
 
25
+ echo "[$(date +%H:%M:%S)] queued=$QUEUED (offset=$PREV_OFFSET total=$CUR_LINES chunk=$CHUNK_SIZE)" | tee -a "$LOG"
26
+ [[ $QUEUED -le 0 ]] && exit 0
27
 
28
+ # Take just one chunk (don't try to push everything at once β€” that's why it kept failing)
29
+ TAKE=$QUEUED
30
+ [[ $TAKE -gt $CHUNK_SIZE ]] && TAKE=$CHUNK_SIZE
31
+
32
+ # Resolve token
33
  HF_AUTH="${HF_TOKEN:-${HUGGING_FACE_HUB_TOKEN:-${HUGGINGFACE_TOKEN:-}}}"
34
  if [[ -z "$HF_AUTH" ]]; then
35
+ echo "[$(date +%H:%M:%S)] ERR: no HF_TOKEN β€” cannot upload" | tee -a "$LOG"
36
  exit 1
37
  fi
38
 
39
+ # Slice this chunk to a unique-per-cron-fire file (no overwrite)
40
  DATE_TAG=$(date +%Y-%m-%d)
41
+ TIME_TAG=$(date +%H%M%S)
42
+ SLICE_DIR="$HOME/.surrogate/.push-slices"
43
+ mkdir -p "$SLICE_DIR"
44
+ SLICE="$SLICE_DIR/${DATE_TAG}_${TIME_TAG}.jsonl"
45
+
46
+ # Take TAKE lines starting AFTER prev offset
47
+ sed -n "$((PREV_OFFSET + 1)),$((PREV_OFFSET + TAKE))p" "$SRC" > "$SLICE"
48
+ SLICE_LINES=$(wc -l < "$SLICE" | tr -d ' ')
49
+ SLICE_BYTES=$(wc -c < "$SLICE" | tr -d ' ')
50
+ echo "[$(date +%H:%M:%S)] uploading slice: $SLICE_LINES lines / $((SLICE_BYTES/1024)) KB" | tee -a "$LOG"
51
 
52
+ # Upload to a chunk-specific filename β€” never overwrites, just adds new files
53
+ NEW_OFFSET=$(( PREV_OFFSET + TAKE ))
54
+ REMOTE_PATH="batches/${DATE_TAG}/chunk-${TIME_TAG}-${NEW_OFFSET}.jsonl"
55
+
56
+ if HF_AUTH="$HF_AUTH" python3 - "$SLICE" "$REMOTE_PATH" "$SLICE_LINES" >> "$LOG" 2>&1 <<'PYEOF'
57
+ import sys, os, time
58
+ slice_path, remote, n_lines = sys.argv[1], sys.argv[2], sys.argv[3]
59
  hf_auth = os.environ["HF_AUTH"]
60
 
61
  try:
 
64
  print(f"[{time.strftime('%H:%M:%S')}] ERR: huggingface_hub not installed")
65
  sys.exit(2)
66
 
 
67
  api = HfApi(token=hf_auth)
 
68
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  api.upload_file(
70
+ path_or_fileobj=slice_path,
71
+ path_in_repo=remote,
72
  repo_id="axentx/surrogate-1-training-pairs",
73
  repo_type="dataset",
74
+ commit_message=f"chunk: +{n_lines} pairs ({time.strftime('%H:%M')})",
75
  )
76
+ print(f"[{time.strftime('%H:%M:%S')}] βœ… uploaded β†’ {remote}")
77
  sys.exit(0)
78
  except Exception as e:
79
  print(f"[{time.strftime('%H:%M:%S')}] ❌ {type(e).__name__}: {str(e)[:300]}")
80
  sys.exit(3)
81
  PYEOF
82
  then
83
+ echo "$NEW_OFFSET" > "$OFFSET_FILE"
 
84
  rm -f "$SLICE"
85
+ REMAINING=$(( CUR_LINES - NEW_OFFSET ))
86
+ echo "[$(date +%H:%M:%S)] offset β†’ $NEW_OFFSET Β· remaining=$REMAINING (next run)" | tee -a "$LOG"
87
  else
88
+ echo "[$(date +%H:%M:%S)] push failed β€” offset still $PREV_OFFSET, slice retained: $SLICE" | tee -a "$LOG"
89
  exit 1
90
  fi
bin/surrogate-self-ingest.sh CHANGED
@@ -36,40 +36,57 @@ NEW=$(( CUR - PREV ))
36
 
37
  echo "[$(date +%H:%M:%S)] ingesting $NEW new pairs into FTS index" | tee -a "$LOG"
38
 
39
- tail -n "$NEW" "$SRC" | python3 - "$INDEX" >> "$LOG" 2>&1 <<'PYEOF'
 
 
 
 
 
 
40
  import sys, json, sqlite3
41
- from datetime import datetime
42
  db = sys.argv[1]
43
  con = sqlite3.connect(db)
44
  con.execute("BEGIN")
45
- n = 0
46
  for line in sys.stdin:
47
  try:
48
  d = json.loads(line)
49
- src = d.get("source", "?")
50
- role = src.replace("orchestrate-", "") if src.startswith("orchestrate-") else src
51
- ts = d.get("ts", 0)
52
- prompt = (d.get("prompt") or "")[:4000]
53
- response = (d.get("response") or "")[:8000]
54
- if len(prompt) < 50 or len(response) < 50:
55
- continue
 
 
 
 
 
 
 
56
  con.execute(
57
  "INSERT INTO pairs(source,role,prompt,response,ts) VALUES (?,?,?,?,?)",
58
  (src, role, prompt, response, str(ts))
59
  )
60
  n += 1
61
  except Exception as e:
62
- print(f" skip line: {type(e).__name__}", file=sys.stderr)
63
  con.commit()
64
- print(f" ingested {n} pairs (FTS index)", flush=True)
65
  PYEOF
66
 
67
- echo "$CUR" > "$OFFSET_FILE"
68
- echo "[$(date +%H:%M:%S)] ingest done Β· offset β†’ $CUR" | tee -a "$LOG"
 
 
69
 
70
- # Print quick stats
71
  TOTAL=$(sqlite3 "$INDEX" "SELECT COUNT(*) FROM pairs" 2>/dev/null)
 
72
  BY_ROLE=$(sqlite3 "$INDEX" "SELECT role, COUNT(*) FROM pairs GROUP BY role ORDER BY 2 DESC LIMIT 5" 2>/dev/null)
73
  echo " total indexed: $TOTAL" | tee -a "$LOG"
74
- echo " top roles:" | tee -a "$LOG"
75
- echo "$BY_ROLE" | sed 's/^/ /' | tee -a "$LOG"
 
 
 
36
 
37
  echo "[$(date +%H:%M:%S)] ingesting $NEW new pairs into FTS index" | tee -a "$LOG"
38
 
39
+ # Process in batches of 5000 β€” gentle, doesn't blow memory
40
+ BATCH_SIZE="${SELF_INGEST_BATCH:-5000}"
41
+ TAKE=$NEW
42
+ [[ $TAKE -gt $BATCH_SIZE ]] && TAKE=$BATCH_SIZE
43
+ echo "[$(date +%H:%M:%S)] processing $TAKE / $NEW (batch_size=$BATCH_SIZE)" | tee -a "$LOG"
44
+
45
+ sed -n "$((PREV + 1)),$((PREV + TAKE))p" "$SRC" | python3 - "$INDEX" >> "$LOG" 2>&1 <<'PYEOF'
46
  import sys, json, sqlite3
 
47
  db = sys.argv[1]
48
  con = sqlite3.connect(db)
49
  con.execute("BEGIN")
50
+ n = skipped_short = skipped_parse = 0
51
  for line in sys.stdin:
52
  try:
53
  d = json.loads(line)
54
+ except Exception:
55
+ skipped_parse += 1
56
+ continue
57
+ src = d.get("source", "?")
58
+ role = src.replace("orchestrate-", "") if src.startswith("orchestrate-") else src
59
+ ts = d.get("ts", 0)
60
+ prompt = (d.get("prompt") or "")[:4000]
61
+ response = (d.get("response") or "")[:8000]
62
+ # Relaxed filter: index anything with both fields present (was 50-char min)
63
+ # Even short pairs are useful for tag-based retrieval
64
+ if not prompt or not response:
65
+ skipped_short += 1
66
+ continue
67
+ try:
68
  con.execute(
69
  "INSERT INTO pairs(source,role,prompt,response,ts) VALUES (?,?,?,?,?)",
70
  (src, role, prompt, response, str(ts))
71
  )
72
  n += 1
73
  except Exception as e:
74
+ print(f" insert err: {type(e).__name__}: {str(e)[:80]}", file=sys.stderr)
75
  con.commit()
76
+ print(f" inserted={n} skipped_parse={skipped_parse} skipped_empty={skipped_short}", flush=True)
77
  PYEOF
78
 
79
+ # Advance offset by what we actually processed
80
+ NEW_OFFSET=$(( PREV + TAKE ))
81
+ echo "$NEW_OFFSET" > "$OFFSET_FILE"
82
+ echo "[$(date +%H:%M:%S)] ingest batch done Β· offset β†’ $NEW_OFFSET (remaining: $((CUR - NEW_OFFSET)))" | tee -a "$LOG"
83
 
84
+ # Quick stats
85
  TOTAL=$(sqlite3 "$INDEX" "SELECT COUNT(*) FROM pairs" 2>/dev/null)
86
+ TOTAL=${TOTAL:-0}
87
  BY_ROLE=$(sqlite3 "$INDEX" "SELECT role, COUNT(*) FROM pairs GROUP BY role ORDER BY 2 DESC LIMIT 5" 2>/dev/null)
88
  echo " total indexed: $TOTAL" | tee -a "$LOG"
89
+ [[ -n "$BY_ROLE" ]] && {
90
+ echo " top roles:" | tee -a "$LOG"
91
+ echo "$BY_ROLE" | sed 's/^/ /' | tee -a "$LOG"
92
+ }