Ashira Pitchayapakayakul commited on
Commit
39c61d0
Β·
1 Parent(s): 84a1ec7

fix: 4 chain bugs found via /selftest+ chain audit

Browse files

VERIFIED via curl /selftest = ok:true (env clean)
But chain audit found 4 runtime bugs:

BUG 1: dedup.db CORRUPTION
16 parallel shards writing to SQLite \u2192 'database disk image is malformed'
All bulk-ingest shards failing on stats() lookup
FIX: dedup.py auto-recovery
- Add busy_timeout=30000 (30s wait on lock)
- Add wal_autocheckpoint=1000
- On corruption: rename to .corrupt-{ts}.bak + retry init (auto-rebuild)
- Smoke-test with SELECT 1 before declaring connection healthy

BUG 2: agentic-crawler ValueError on non-HTML
Hit DNS records / zone files \u2192 ValueError on int parsing
FIX: agentic-crawler.sh
- Read Content-Type header; skip if not html
- Stamp visited with status, exit cleanly

BUG 3: github-agentic-crawler 'all tokens exhausted' loops
Was: sleep min(60, soonest_reset - now) capped 600s
But all 4 tokens hit 403, sleep 60s, hit 403 again, repeat
FIX: TokenPool.wait_for_any_reset()
- Compute earliest reset_at among exhausted tokens
- Sleep until earliest + 5s buffer
- Cap 1 hour (was 10 min)
- Now: hit limit \u2192 sleep until GitHub resets \u2192 resume cleanly

BUG 4: parquet-direct heredoc bash escaping (visible in log)
Not crashing but printing garbled. Skip for now \u2014 dataset-server API
returning 'no shards' for fineweb anyway. Defer cleanup.

EXPECTED after rebuild:
- dedup.db rebuilds clean (corrupt backup saved)
- 16 shards finally drain datasets without sqlite errors
- GH crawler self-paces around rate limits
- Web crawler skips non-HTML cleanly

Run /selftest after rebuild to confirm all green again.

bin/agentic-crawler.sh CHANGED
@@ -114,6 +114,14 @@ try:
114
  with urllib.request.urlopen(req, timeout=20) as r:
115
  body = r.read(2_000_000).decode("utf-8", errors="ignore")
116
  status = r.status
 
 
 
 
 
 
 
 
117
  except Exception as e:
118
  con.execute("INSERT OR REPLACE INTO visited VALUES (?,?,?,?,?,?,?)",
119
  (url, int(time.time()), -1, None, domain, depth, 0))
 
114
  with urllib.request.urlopen(req, timeout=20) as r:
115
  body = r.read(2_000_000).decode("utf-8", errors="ignore")
116
  status = r.status
117
+ ctype = (r.headers.get("Content-Type") or "").lower()
118
+ # Skip non-HTML responses (DNS records, raw zone files, etc. were crashing parser)
119
+ if "html" not in ctype and "<html" not in body[:1000].lower():
120
+ con.execute("INSERT OR REPLACE INTO visited VALUES (?,?,?,?,?,?,?)",
121
+ (url, int(time.time()), status, "", domain, depth, len(body)))
122
+ con.commit()
123
+ print(f" [skip-non-html] {ctype[:30]} {url[:80]}")
124
+ sys.exit(0)
125
  except Exception as e:
126
  con.execute("INSERT OR REPLACE INTO visited VALUES (?,?,?,?,?,?,?)",
127
  (url, int(time.time()), -1, None, domain, depth, 0))
bin/github-agentic-crawler.py CHANGED
@@ -197,7 +197,6 @@ class TokenPool:
197
  """Pick token with most remaining quota; if all exhausted, return None."""
198
  with self.lock:
199
  now = time.time()
200
- # Reset expired counters
201
  for s in self.states:
202
  if s.reset_at and now > s.reset_at:
203
  s.remaining = 5000
@@ -205,13 +204,22 @@ class TokenPool:
205
  ready = [s for s in self.states if s.remaining > 50]
206
  if not ready:
207
  return None
208
- # Round-robin among ready, weighted by remaining
209
  ready.sort(key=lambda s: (-s.remaining, s.last_used))
210
  picked = ready[0]
211
  picked.last_used = now
212
- picked.remaining -= 1 # optimistic; refined from response headers
213
  return picked
214
 
 
 
 
 
 
 
 
 
 
 
215
  def update_from_headers(self, state: TokenState, headers: dict) -> None:
216
  with self.lock:
217
  try:
@@ -233,10 +241,9 @@ def gh_get(url: str, pool: TokenPool, retries: int = 2) -> tuple[dict | list | N
233
  for attempt in range(retries + 1):
234
  state = pool.acquire()
235
  if state is None:
236
- soonest = pool.soonest_reset()
237
- wait = max(60, int(soonest - time.time()))
238
- log(f" all tokens exhausted, sleeping {wait}s until reset")
239
- time.sleep(min(wait, 600))
240
  continue
241
  req = urllib.request.Request(url, headers={
242
  "Accept": "application/vnd.github+json",
 
197
  """Pick token with most remaining quota; if all exhausted, return None."""
198
  with self.lock:
199
  now = time.time()
 
200
  for s in self.states:
201
  if s.reset_at and now > s.reset_at:
202
  s.remaining = 5000
 
204
  ready = [s for s in self.states if s.remaining > 50]
205
  if not ready:
206
  return None
 
207
  ready.sort(key=lambda s: (-s.remaining, s.last_used))
208
  picked = ready[0]
209
  picked.last_used = now
210
+ picked.remaining -= 1
211
  return picked
212
 
213
+ def wait_for_any_reset(self) -> int:
214
+ """Sleep until earliest token reset (instead of 60s naive sleep)."""
215
+ with self.lock:
216
+ now = time.time()
217
+ resets = [s.reset_at - int(now) for s in self.states if s.reset_at and s.reset_at > now]
218
+ if not resets:
219
+ return 60
220
+ wait = min(resets) + 5 # buffer
221
+ return min(wait, 3600) # cap 1h
222
+
223
  def update_from_headers(self, state: TokenState, headers: dict) -> None:
224
  with self.lock:
225
  try:
 
241
  for attempt in range(retries + 1):
242
  state = pool.acquire()
243
  if state is None:
244
+ wait = pool.wait_for_any_reset()
245
+ log(f" all tokens exhausted β€” sleep {wait}s until earliest reset")
246
+ time.sleep(wait)
 
247
  continue
248
  req = urllib.request.Request(url, headers={
249
  "Accept": "application/vnd.github+json",
bin/lib/dedup.py CHANGED
@@ -33,19 +33,44 @@ class DedupStore:
33
  def _connection(cls) -> sqlite3.Connection:
34
  if cls._conn is None:
35
  DB_PATH.parent.mkdir(parents=True, exist_ok=True)
36
- c = sqlite3.connect(str(DB_PATH), check_same_thread=False, timeout=10)
37
- c.execute("PRAGMA journal_mode=WAL")
38
- c.execute("PRAGMA synchronous=NORMAL")
39
- c.executescript("""
40
- CREATE TABLE IF NOT EXISTS seen_hashes (
41
- hash TEXT PRIMARY KEY,
42
- source TEXT NOT NULL,
43
- ts INTEGER NOT NULL
44
- );
45
- CREATE INDEX IF NOT EXISTS idx_seen_source ON seen_hashes(source);
46
- CREATE INDEX IF NOT EXISTS idx_seen_ts ON seen_hashes(ts);
47
- """)
48
- cls._conn = c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  return cls._conn
50
 
51
  @classmethod
 
33
  def _connection(cls) -> sqlite3.Connection:
34
  if cls._conn is None:
35
  DB_PATH.parent.mkdir(parents=True, exist_ok=True)
36
+ # Auto-recover from corruption (16 parallel shards can corrupt SQLite)
37
+ for attempt in range(3):
38
+ try:
39
+ c = sqlite3.connect(str(DB_PATH), check_same_thread=False,
40
+ timeout=30, isolation_level=None)
41
+ c.execute("PRAGMA journal_mode=WAL")
42
+ c.execute("PRAGMA synchronous=NORMAL")
43
+ c.execute("PRAGMA busy_timeout=30000") # 30s wait on lock
44
+ c.execute("PRAGMA wal_autocheckpoint=1000")
45
+ c.executescript("""
46
+ CREATE TABLE IF NOT EXISTS seen_hashes (
47
+ hash TEXT PRIMARY KEY,
48
+ source TEXT NOT NULL,
49
+ ts INTEGER NOT NULL
50
+ );
51
+ CREATE INDEX IF NOT EXISTS idx_seen_source ON seen_hashes(source);
52
+ CREATE INDEX IF NOT EXISTS idx_seen_ts ON seen_hashes(ts);
53
+ """)
54
+ # Smoke-test the table
55
+ c.execute("SELECT 1 FROM seen_hashes LIMIT 1").fetchall()
56
+ cls._conn = c
57
+ break
58
+ except sqlite3.DatabaseError as e:
59
+ if "malformed" in str(e).lower() or "corrupt" in str(e).lower():
60
+ # Backup + reset corrupted DB
61
+ import time as _t
62
+ backup = DB_PATH.with_suffix(f".corrupt-{int(_t.time())}.bak")
63
+ try:
64
+ DB_PATH.rename(backup)
65
+ for ext in ("-wal", "-shm"):
66
+ p = DB_PATH.with_suffix(DB_PATH.suffix + ext)
67
+ if p.exists():
68
+ p.unlink()
69
+ except Exception:
70
+ pass
71
+ if attempt < 2:
72
+ continue
73
+ raise
74
  return cls._conn
75
 
76
  @classmethod