moonlantern1 commited on
Commit
92ccf74
·
verified ·
1 Parent(s): 8c2a812

Upload brain_virality_predictor/downloader.py with huggingface_hub

Browse files
brain_virality_predictor/downloader.py ADDED
@@ -0,0 +1,191 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Video download orchestrator.
3
+ Consumes Apify-style JSON dumps (good/okish/bad) + cookies.txt,
4
+ uses yt-dlp to fetch videos into labeled folders.
5
+ """
6
+
7
+ import json, random, re, subprocess, time
8
+ from collections import Counter
9
+ from concurrent.futures import ThreadPoolExecutor, as_completed
10
+ from pathlib import Path
11
+ from typing import Dict, List, Tuple
12
+
13
+
14
+ def parse_json_labels(json_dir: Path) -> Dict[str, Path]:
15
+ """
16
+ Scan a directory for JSON label files.
17
+ Accepts patterns like good*.json, bad*.json, okish*.json, neutral*.json.
18
+ """
19
+ candidates = {lbl: [] for lbl in ("good", "okish", "bad")}
20
+ for p in json_dir.glob("*.json"):
21
+ name = p.name.lower()
22
+ if re.search(r"\(\d+\)", name): # skip duplicates like "(1)"
23
+ continue
24
+ if "good" in name or "banger" in name or "viral" in name or "winner" in name:
25
+ candidates["good"].append(p)
26
+ elif "bad" in name or "flop" in name or "loser" in name or "poor" in name:
27
+ candidates["bad"].append(p)
28
+ elif "okish" in name or "neutral" in name or "avg" in name or "mid" in name or "average" in name:
29
+ candidates["okish"].append(p)
30
+
31
+ chosen = {}
32
+ for lbl, paths in candidates.items():
33
+ if paths:
34
+ chosen[lbl] = min(paths, key=lambda p: (len(p.name), p.name))
35
+ return chosen
36
+
37
+
38
+ def build_job_queue(json_paths: Dict[str, Path], out_root: Path) -> Tuple[List, List]:
39
+ """
40
+ Build deduplicated download jobs from JSON files.
41
+ Returns: (ig_jobs, other_jobs) where each job is (url, target_path, label).
42
+ """
43
+ out_root = Path(out_root)
44
+ out_root.mkdir(parents=True, exist_ok=True)
45
+
46
+ def safe_name(s, n=30):
47
+ return "".join(c if c.isalnum() or c in "-_" else "_" for c in str(s))[:n]
48
+
49
+ def out_path(label, item, idx):
50
+ handle = safe_name(item.get("handle") or "unknown", 30)
51
+ score = f"{item.get('outlier_score', 0):.2f}".replace(".", "p")
52
+ return out_root / label / f"{idx:03d}_{score}_{handle}.mp4"
53
+
54
+ seen_urls = set()
55
+ ig_jobs, other_jobs = [], []
56
+
57
+ for label, json_path in json_paths.items():
58
+ items = json.load(open(json_path))
59
+ for idx, item in enumerate(items):
60
+ url = (item.get("url") or "").strip()
61
+ if not url or url in seen_urls:
62
+ continue
63
+ seen_urls.add(url)
64
+ target = out_path(label, item, idx)
65
+ platform = (item.get("platform") or "").lower()
66
+ is_ig = "instagram" in platform or "instagram.com" in url
67
+ (ig_jobs if is_ig else other_jobs).append((url, target, label))
68
+
69
+ return ig_jobs, other_jobs
70
+
71
+
72
+ def categorize_error(err: str) -> str:
73
+ if not err:
74
+ return "unknown"
75
+ t = err.lower()
76
+ if "rate" in t or "429" in t:
77
+ return "rate_limited"
78
+ if "login" in t or "private" in t:
79
+ return "private"
80
+ if "not found" in t or "404" in t or "removed" in t or "unavailable" in t:
81
+ return "deleted"
82
+ if "timeout" in t:
83
+ return "timeout"
84
+ return "other"
85
+
86
+
87
+ def download_one(url: str, target: Path, cookies: Path, timeout: int = 240) -> Dict:
88
+ if target.exists() and target.stat().st_size > 100_000:
89
+ return {"url": url, "path": str(target), "status": "skip", "error": None, "category": None}
90
+
91
+ target.parent.mkdir(parents=True, exist_ok=True)
92
+ cmd = [
93
+ "yt-dlp", url,
94
+ "-f", "best[ext=mp4]/mp4/best",
95
+ "-o", str(target),
96
+ "--no-warnings", "--quiet",
97
+ "--retries", "5",
98
+ "--socket-timeout", "30",
99
+ "--no-playlist",
100
+ "--max-filesize", "200M",
101
+ ]
102
+ if cookies.exists():
103
+ cmd += ["--cookies", str(cookies)]
104
+
105
+ try:
106
+ r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
107
+ if r.returncode == 0 and target.exists() and target.stat().st_size > 100_000:
108
+ return {"url": url, "path": str(target), "status": "ok", "error": None, "category": None}
109
+ err = (r.stderr or r.stdout or "?")[-300:]
110
+ return {"url": url, "path": str(target), "status": "fail",
111
+ "error": err, "category": categorize_error(err)}
112
+ except subprocess.TimeoutExpired:
113
+ return {"url": url, "path": str(target), "status": "fail",
114
+ "error": "timeout", "category": "timeout"}
115
+ except Exception as e:
116
+ return {"url": url, "path": str(target), "status": "fail",
117
+ "error": str(e), "category": categorize_error(str(e))}
118
+
119
+
120
+ def download_all(json_dir: Path, cookies_path: Path, out_root: Path,
121
+ workers: int = 4, log_path: Path = None) -> List[Dict]:
122
+ """
123
+ Full orchestrated download pipeline.
124
+ Returns list of result dicts.
125
+ """
126
+ json_paths = parse_json_labels(json_dir)
127
+ assert len(json_paths) == 3, f"Need 3 label JSONs (good/okish/bad), got: {list(json_paths.keys())}"
128
+
129
+ ig_jobs, other_jobs = build_job_queue(json_paths, out_root)
130
+ total = len(ig_jobs) + len(other_jobs)
131
+ print(f"Queue: {len(other_jobs)} TT/YT + {len(ig_jobs)} IG = {total} unique URLs")
132
+
133
+ cookies = Path(cookies_path)
134
+ has_cookies = cookies.exists() and "sessionid" in cookies.read_text(errors="ignore").lower()
135
+ print(f"Cookies: {'logged in' if has_cookies else 'missing — IG may fail'}")
136
+
137
+ results: List[Dict] = []
138
+
139
+ # Phase 1: TT/YT in parallel
140
+ print(f"\nPhase 1: TikTok + YouTube Shorts (parallel ×{workers})")
141
+ t0 = time.perf_counter()
142
+ with ThreadPoolExecutor(max_workers=workers) as ex:
143
+ futs = {ex.submit(download_one, u, t, cookies): (u, t, l) for u, t, l in other_jobs}
144
+ for i, fut in enumerate(as_completed(futs), 1):
145
+ res = fut.result()
146
+ results.append(res)
147
+ icon = {"ok": "✓", "skip": "→", "fail": "✗"}.get(res["status"], "?")
148
+ if i % 10 == 0 or res["status"] == "fail":
149
+ print(f" [{i}/{len(other_jobs)}] {icon} {Path(res['path']).name[:40]}")
150
+ print(f"Phase 1 done in {(time.perf_counter()-t0)/60:.1f} min")
151
+
152
+ # Phase 2: IG sequentially with jitter
153
+ print(f"\nPhase 2: Instagram (sequential, 2-4s jitter)")
154
+ t1 = time.perf_counter()
155
+ consecutive_fails = 0
156
+ for i, (u, t, l) in enumerate(ig_jobs, 1):
157
+ res = download_one(u, t, cookies)
158
+ results.append(res)
159
+ icon = {"ok": "✓", "skip": "→", "fail": "✗"}.get(res["status"], "?")
160
+ cat_tag = f"[{res['category']}]" if res.get("category") else ""
161
+ print(f" [{i}/{len(ig_jobs)}] {icon} {l:<6} {Path(res['path']).name[:38]:<38} {cat_tag}")
162
+
163
+ if res["status"] == "fail" and res.get("category") == "rate_limited":
164
+ consecutive_fails += 1
165
+ else:
166
+ consecutive_fails = 0
167
+
168
+ if res["status"] != "skip":
169
+ if consecutive_fails >= 3:
170
+ sleep_s = 30 + random.uniform(0, 15)
171
+ print(f" ⏸ rate-limit backoff: {sleep_s:.0f}s")
172
+ else:
173
+ sleep_s = 2 + random.uniform(0, 2)
174
+ time.sleep(sleep_s)
175
+
176
+ if log_path:
177
+ Path(log_path).parent.mkdir(parents=True, exist_ok=True)
178
+ with open(log_path, "w") as f:
179
+ json.dump(results, f, indent=2)
180
+
181
+ # Summary
182
+ print(f"\n{'═'*60}")
183
+ print(f" Total time: {(time.perf_counter()-t0)/60:.1f} min")
184
+ print(f" Status: {Counter(r['status'] for r in results)}")
185
+ for lbl in ["good", "okish", "bad"]:
186
+ folder = Path(out_root) / lbl
187
+ n_ok = len(list(folder.glob("*.mp4"))) if folder.exists() else 0
188
+ print(f" {lbl:<6} {n_ok:>3} downloaded")
189
+ print(f"{'═'*60}")
190
+
191
+ return results