| """ |
| Extended Dataset Builder — YouTube scraping + ESC-50 + expanded synthetic |
| ========================================================================= |
| Run this on your LOCAL machine or Google Colab (YouTube is blocked in HF Spaces). |
| |
| This extends the existing dataset at Ellaft/pc-fault-real-dataset with: |
| 1. YouTube scraped audio + video frames (real PC fault sounds/screens) |
| 2. ESC-50 environmental sounds (real recordings mapped to fault classes) |
| 3. More synthetic data (500 per class instead of 300) |
| |
| Usage: |
| pip install yt-dlp Pillow numpy scipy librosa soundfile datasets huggingface_hub imageio-ffmpeg scikit-learn |
| sudo apt install ffmpeg # needed for YouTube frame extraction + ESC-50 audio decoding |
| |
| python extend_dataset.py # Full run with YouTube + everything |
| """ |
|
|
| import os, sys, json, random, glob, shutil, subprocess |
| import numpy as np |
| from pathlib import Path |
| from PIL import Image, ImageDraw |
| from collections import Counter, defaultdict |
|
|
| SAMPLE_RATE = 16000 |
| AUDIO_DURATION = 5.0 |
| IMAGE_SIZE = (224, 224) |
|
|
| FAULT_CLASSES = {0: "normal_operation", 1: "boot_failure", 2: "overheating_fan", 3: "storage_failure", 4: "system_crash"} |
| FAULT_NAME_TO_ID = {v: k for k, v in FAULT_CLASSES.items()} |
|
|
| YOUTUBE_QUERIES = { |
| "normal_operation": [ |
| "quiet gaming PC idle fan noise ambient", |
| "silent PC build running quiet computer sound", |
| "computer fan white noise 1 hour", |
| ], |
| "boot_failure": [ |
| "BIOS beep codes sounds AMI Award", |
| "computer beep codes troubleshooting POST", |
| "PC won't boot beeping sound motherboard", |
| "UEFI boot failure beep codes", |
| ], |
| "overheating_fan": [ |
| "loud PC fan noise grinding bearing failure", |
| "CPU fan rattling noise overheating", |
| "laptop fan very loud overheating spinning fast", |
| "PC fan bearing failure wobble noise", |
| ], |
| "storage_failure": [ |
| "hard drive clicking noise dying HDD sound", |
| "HDD click of death failing hard drive", |
| "hard drive failure sounds different brands", |
| "hard disk clicking grinding seek failure", |
| ], |
| "system_crash": [ |
| "Windows blue screen of death BSOD live crash", |
| "computer crash freeze blue screen error", |
| "Windows 10 BSOD crash compilation", |
| "PC kernel panic crash screen recording", |
| ], |
| } |
|
|
|
|
| def find_ffmpeg(): |
| try: |
| r = subprocess.run(["ffmpeg", "-version"], capture_output=True, timeout=5) |
| if r.returncode == 0: return "ffmpeg" |
| except: pass |
| try: |
| import imageio_ffmpeg |
| return imageio_ffmpeg.get_ffmpeg_exe() |
| except: pass |
| return None |
|
|
|
|
| def scrape_youtube_class(fault_class, queries, output_dir, max_videos_per_query=5): |
| audio_dir = output_dir / "yt_audio" / fault_class |
| frames_dir = output_dir / "yt_frames" / fault_class |
| audio_dir.mkdir(parents=True, exist_ok=True) |
| frames_dir.mkdir(parents=True, exist_ok=True) |
|
|
| for q in queries: |
| print(f" [{fault_class}] Searching: '{q}'") |
| cmd = [ |
| "yt-dlp", f"ytsearch{max_videos_per_query}:{q}", |
| "--extract-audio", "--audio-format", "wav", "--audio-quality", "0", |
| "--match-filter", "duration<180", |
| "--max-downloads", str(max_videos_per_query), |
| "--no-playlist", "--quiet", "--no-warnings", |
| "-o", str(audio_dir / "%(id)s.%(ext)s"), |
| ] |
| try: |
| subprocess.run(cmd, timeout=300, capture_output=True) |
| except Exception as e: |
| print(f" audio error: {e}") |
|
|
| vid_cmd = [ |
| "yt-dlp", f"ytsearch{max_videos_per_query}:{q}", |
| "--format", "worst[ext=mp4]/worst", |
| "--match-filter", "duration<180", |
| "--max-downloads", str(max_videos_per_query), |
| "--no-playlist", "--quiet", "--no-warnings", |
| "-o", str(frames_dir / "%(id)s.%(ext)s"), |
| ] |
| try: |
| subprocess.run(vid_cmd, timeout=300, capture_output=True) |
| except Exception as e: |
| print(f" video error: {e}") |
|
|
| ffmpeg_bin = find_ffmpeg() |
| if ffmpeg_bin: |
| for vf in list(frames_dir.glob("*.mp4")) + list(frames_dir.glob("*.webm")): |
| try: |
| subprocess.run([ffmpeg_bin, "-i", str(vf), "-vf", "fps=0.5,scale=224:224", |
| "-q:v", "2", "-y", "-loglevel", "error", |
| str(frames_dir / f"{vf.stem}_frame_%04d.jpg")], timeout=60, capture_output=True) |
| except: pass |
| vf.unlink(missing_ok=True) |
|
|
| n_audio = len(list(audio_dir.glob("*.wav"))) |
| n_frames = len(list(frames_dir.glob("*.jpg"))) |
| print(f" [{fault_class}] got {n_audio} audio, {n_frames} frames") |
| return n_audio, n_frames |
|
|
|
|
| def run_youtube_scraping(output_dir, max_videos_per_query=5): |
| print("\n" + "="*60 + "\nYOUTUBE SCRAPING\n" + "="*60) |
| for cls, queries in YOUTUBE_QUERIES.items(): |
| scrape_youtube_class(cls, queries, output_dir, max_videos_per_query) |
|
|
|
|
| ESC50_MAPPING = { |
| "vacuum_cleaner": "overheating_fan", "engine": "overheating_fan", "washing_machine": "overheating_fan", |
| "clock_alarm": "boot_failure", "siren": "boot_failure", |
| "clock_tick": "storage_failure", "door_wood_knock": "storage_failure", "hand_saw": "storage_failure", |
| "glass_breaking": "system_crash", "fireworks": "system_crash", "chainsaw": "system_crash", |
| "keyboard_typing": "normal_operation", "mouse_click": "normal_operation", |
| } |
|
|
|
|
| def download_esc50(output_dir, max_per_class=80): |
| print("\n" + "="*60 + "\nESC-50 DATASET\n" + "="*60) |
| import soundfile as sf |
| from datasets import load_dataset |
| audio_dir = output_dir / "esc50_audio" |
| audio_dir.mkdir(parents=True, exist_ok=True) |
| try: |
| ds = load_dataset("ashraq/esc50", split="train") |
| print(f" Loaded {len(ds)} samples") |
| except Exception as e: |
| print(f" Failed: {e}"); return {} |
| counts = defaultdict(int) |
| for sample in ds: |
| cat = sample["category"] |
| if cat not in ESC50_MAPPING: continue |
| fc = ESC50_MAPPING[cat] |
| if counts[fc] >= max_per_class: continue |
| audio = sample["audio"] |
| arr = np.array(audio["array"], dtype=np.float32) |
| sf.write(str(audio_dir / f"{fc}_esc50_{counts[fc]:04d}.wav"), arr, audio["sampling_rate"]) |
| counts[fc] += 1 |
| for fc, c in sorted(counts.items()): print(f" {fc}: {c}") |
| return dict(counts) |
|
|
|
|
| def download_cooling_fans(output_dir, max_per_class=500): |
| print("\n" + "="*60 + "\nCOOLING FANS\n" + "="*60) |
| audio_dir = output_dir / "hf_audio" |
| audio_dir.mkdir(parents=True, exist_ok=True) |
| try: |
| from huggingface_hub import snapshot_download |
| path = snapshot_download("HenriqueFrancaa/cooling-fans-db0", repo_type="dataset") |
| wav_files = glob.glob(os.path.join(path, "**/*.wav"), recursive=True) |
| normal_c, abnormal_c = 0, 0 |
| for wf in wav_files: |
| pl = wf.lower() |
| if "abnormal" in pl and abnormal_c < max_per_class: |
| shutil.copy2(wf, audio_dir / f"overheating_fan_cfan_{abnormal_c:04d}.wav"); abnormal_c += 1 |
| elif "normal" in pl and normal_c < max_per_class: |
| shutil.copy2(wf, audio_dir / f"normal_operation_cfan_{normal_c:04d}.wav"); normal_c += 1 |
| print(f" {normal_c} normal, {abnormal_c} abnormal") |
| except Exception as e: |
| print(f" Failed: {e}") |
|
|
|
|
| def _synthesize_beep_pattern(pattern, freq=1000, jitter=0.1): |
| audio = [] |
| for dur, is_beep in pattern: |
| actual = dur * (1 + random.uniform(-jitter, jitter)) |
| n = int(SAMPLE_RATE * actual / 1000) |
| if is_beep: |
| t = np.linspace(0, actual/1000, n) |
| b = 0.7*(0.5*np.sign(np.sin(2*np.pi*freq*t))) + 0.3*(0.5*np.sin(2*np.pi*freq*t)) |
| env = np.ones(n); a = min(200, n//4) |
| env[:a] = np.linspace(0,1,a); env[-a:] = np.linspace(1,0,a) |
| audio.append((b*env).astype(np.float32)) |
| else: |
| audio.append(np.zeros(n, dtype=np.float32)) |
| r = np.concatenate(audio); tgt = int(SAMPLE_RATE*AUDIO_DURATION) |
| if len(r)<tgt: r = np.tile(r, tgt//len(r)+1)[:tgt] |
| else: r = r[:tgt] |
| return r |
|
|
|
|
| def generate_synth_audio(output_dir, n=500): |
| print("\n" + "="*60 + "\nSYNTHETIC AUDIO\n" + "="*60) |
| import scipy.io.wavfile as wav |
| d = output_dir / "synth_audio"; d.mkdir(parents=True, exist_ok=True) |
| tgt = int(SAMPLE_RATE*AUDIO_DURATION) |
| PATS = {"a":[(800,True),(200,False),(200,True),(200,False)], "b":[(200,True),(100,False)]*3, |
| "c":[(200,True),(100,False)]*5, "d":[(800,True),(200,False),(800,True),(200,False),(200,True)], |
| "e":[(800,True),(300,False)], "f":[(200,True),(300,False),(200,True),(300,False),(200,True),(500,False),(200,True)]} |
| c = 0 |
| for _,p in PATS.items(): |
| for i in range(n//len(PATS)): |
| a = np.clip(_synthesize_beep_pattern(p,random.uniform(800,1200),0.15)+np.random.randn(tgt)*random.uniform(0.005,0.02),-1,1) |
| wav.write(str(d/f"boot_failure_beep_{c:04d}.wav"),SAMPLE_RATE,(a*32767).astype(np.int16)); c+=1 |
| for i in range(n//10): |
| a = np.clip(_synthesize_beep_pattern([(200,True),(300,False)],random.uniform(900,1100),0.1)+np.random.randn(tgt)*0.005,-1,1) |
| wav.write(str(d/f"normal_operation_beep_{i:04d}.wav"),SAMPLE_RATE,(a*32767).astype(np.int16)) |
| for i in range(n): |
| t = random.choice(["burst","glitch","hang","feedback"]) |
| if t=="burst": a=np.zeros(tgt,np.float32);s=random.randint(0,tgt//2);bl=min(random.randint(SAMPLE_RATE//4,SAMPLE_RATE*2),tgt-s);a[s:s+bl]=(np.random.randn(bl)*random.uniform(0.3,0.8)*np.exp(-np.linspace(0,3,bl))).astype(np.float32) |
| elif t=="glitch": ch=np.random.randn(random.randint(50,500)).astype(np.float32)*0.3;a=np.tile(ch,tgt//len(ch)+1)[:tgt] |
| elif t=="hang": a=np.random.randn(tgt).astype(np.float32)*0.01;cc=random.randint(SAMPLE_RATE//4,SAMPLE_RATE);a[:cc]=np.random.randn(cc)*0.2 |
| else: f=random.uniform(200,2000);tt=np.linspace(0,AUDIO_DURATION,tgt);a=(0.4*np.sin(2*np.pi*f*tt)*np.linspace(0.1,1,tgt)).astype(np.float32) |
| wav.write(str(d/f"system_crash_synth_{i:04d}.wav"),SAMPLE_RATE,(np.clip(a,-1,1)*32767).astype(np.int16)) |
| for i in range(n): |
| a=np.zeros(tgt,np.float32);hf=random.uniform(40,80);tt=np.linspace(0,AUDIO_DURATION,tgt) |
| a+=random.uniform(0.02,0.08)*np.sin(2*np.pi*hf*tt) |
| ci=random.uniform(0.3,1.5);nc=int(AUDIO_DURATION/ci);cd=int(SAMPLE_RATE*random.uniform(0.005,0.02)) |
| for j in range(nc): |
| p=max(0,min(int(j*ci*SAMPLE_RATE)+random.randint(-100,100),tgt-cd)) |
| a[p:p+cd]+=(np.random.randn(cd)*random.uniform(0.2,0.6)*np.exp(-np.linspace(0,8,cd))).astype(np.float32) |
| wav.write(str(d/f"storage_failure_hdd_{i:04d}.wav"),SAMPLE_RATE,(np.clip(a,-1,1)*32767).astype(np.int16)) |
| print(f" Done: {c} beeps, {n//10} normal, {n} crash, {n} HDD") |
|
|
|
|
| def generate_synth_images(output_dir, n=500): |
| print("\n" + "="*60 + "\nSYNTHETIC IMAGES\n" + "="*60) |
| BSOD=[("0x0000007E","SYSTEM_THREAD_EXCEPTION_NOT_HANDLED"),("0x0000003B","SYSTEM_SERVICE_EXCEPTION"),("0x000000EF","CRITICAL_PROCESS_DIED"),("0x00000133","DPC_WATCHDOG_VIOLATION")] |
| POST=["ERROR: Boot device not found","CMOS checksum error","NTLDR is missing","DISK BOOT FAILURE","Reboot and Select proper Boot device"] |
| SMART=["SMART Failure Predicted","Reallocated Sector Count: 1624 (CRITICAL)","Windows found 208 bad sectors","Disk read error occurred"] |
| for cls,fn in [("system_crash",lambda:_bsod(BSOD)),("boot_failure",lambda:_post(POST)),("overheating_fan",lambda:_therm()),("storage_failure",lambda:_stor(SMART)),("normal_operation",lambda:_norm())]: |
| dd=output_dir/"synth_images"/cls;dd.mkdir(parents=True,exist_ok=True) |
| for i in range(n): fn().resize(IMAGE_SIZE,Image.LANCZOS).save(str(dd/f"{cls}_{i:04d}.jpg"),quality=random.randint(80,95)) |
| print(f" {cls}: {n}") |
|
|
| def _bsod(E): |
| w,h=1920,1080;c,m=random.choice(E);img=Image.new("RGB",(w,h),random.choice(["#0078D7","#000080"]));d=ImageDraw.Draw(img) |
| d.text((int(w*.05),int(h*.08)),":(",fill="white");d.text((int(w*.05),int(h*.25)),"Your PC ran into a problem.",fill="white") |
| d.text((int(w*.05),int(h*.35)),f"{random.randint(0,100)}% complete",fill="white");d.text((int(w*.15),int(h*.72)),f"Stop code: {m}",fill="white");return img |
| def _post(E): |
| w,h=random.choice([(1920,1080),(1024,768)]);img=Image.new("RGB",(w,h),"#000000");d=ImageDraw.Draw(img) |
| d.text((20,20),random.choice(["AMI BIOS","Award BIOS","Phoenix BIOS"]),fill="#AAAAAA") |
| d.text((20,50),"CPU: Intel Core i7-12700K",fill="#AAFFAA");d.text((20,80),f"Memory Test: {random.choice([8192,16384,32768])}MB OK",fill="#AAFFAA") |
| d.text((20,140),random.choice(E),fill="#FF0000");d.text((20,180),"Press F1 to Resume",fill="#FFFFFF");return img |
| def _therm(): |
| w,h=1920,1080;s=random.choice(["hw","bios","pop"]) |
| if s=="hw":img=Image.new("RGB",(w,h),"#F0F0F0");d=ImageDraw.Draw(img);d.rectangle([0,0,w,40],fill="#0078D4");d.text((10,10),"HW Monitor - CRITICAL",fill="white");y=60;[((d.text((20,y+i*35),f"{n}: {random.randint(88,110)}°C [CRITICAL]",fill="#FF0000"))) for i,n in enumerate(["CPU","GPU","VRM"])] |
| elif s=="bios":img=Image.new("RGB",(w,h),"#000000");d=ImageDraw.Draw(img);d.text((w//4,h//4),"*** WARNING ***",fill="#FF0000");d.text((w//4,h//4+40),f"CPU: {random.randint(95,110)}°C",fill="#FF0000");d.text((w//4,h//4+80),"System shutting down.",fill="#FFAA00") |
| else:img=Image.new("RGB",(w,h),"#1E1E1E");d=ImageDraw.Draw(img);px=w-420;py=h-210;d.rectangle([px,py,px+400,py+150],fill="#2D2D2D",outline="#FF6600",width=2);d.text((px+15,py+10),"Critical Temperature",fill="#FF6600");d.text((px+15,py+40),f"CPU: {random.randint(90,105)}°C",fill="#FF0000") |
| return img |
| def _stor(E): |
| w,h=random.choice([(1920,1080),(1280,1024)]);img=Image.new("RGB",(w,h),"#000000");d=ImageDraw.Draw(img) |
| s=random.choice(["smart","chkdsk"]) |
| if s=="smart":d.text((w//6,h//3),"WARNING:",fill="#FFAA00");d.text((w//6,h//3+40),random.choice(E),fill="#FF0000");d.text((w//6,h//3+80),"Back up immediately.",fill="#FFFFFF") |
| else:d.text((20,20),"Checking file system on C:",fill="#CCCCCC");d.text((20,60),f"CHKDSK stage {random.randint(1,5)}/5... {random.randint(5,95)}%",fill="#FFFFFF");d.text((20,100),f"Found {random.randint(1,500)} bad sectors.",fill="#FF4444") |
| return img |
| def _norm(): |
| w,h=1920,1080;img=Image.new("RGB",(w,h),random.choice(["#0078D4","#1B5E20","#283593","#004D40","#263238"]));d=ImageDraw.Draw(img) |
| d.rectangle([0,h-48,w,h],fill="#1F1F1F");d.text((w-100,h-35),f"{random.randint(1,12):02d}:{random.randint(0,59):02d} PM",fill="#FFFFFF");return img |
|
|
|
|
| def build_and_upload(output_dir, max_per_class=500): |
| print("\n" + "="*60 + "\nBUILDING + UPLOADING\n" + "="*60) |
| audio_by_class = defaultdict(list) |
| for sub in ["synth_audio","hf_audio","esc50_audio"]: |
| dd=output_dir/sub |
| if not dd.exists():continue |
| for f in dd.rglob("*.wav"): |
| for cn in FAULT_NAME_TO_ID: |
| if f.stem.lower().startswith(cn):audio_by_class[cn].append(str(f));break |
| yt=output_dir/"yt_audio" |
| if yt.exists(): |
| for cd in yt.iterdir(): |
| if cd.is_dir() and cd.name in FAULT_NAME_TO_ID: |
| for f in cd.glob("*.wav"):audio_by_class[cd.name].append(str(f)) |
| image_by_class = defaultdict(list) |
| for cd in (output_dir/"synth_images").iterdir(): |
| if cd.is_dir() and cd.name in FAULT_NAME_TO_ID: |
| for f in cd.glob("*.jpg"):image_by_class[cd.name].append(str(f)) |
| ytf=output_dir/"yt_frames" |
| if ytf.exists(): |
| for cd in ytf.iterdir(): |
| if cd.is_dir() and cd.name in FAULT_NAME_TO_ID: |
| for f in cd.glob("*.jpg"):image_by_class[cd.name].append(str(f)) |
|
|
| print("Audio:"); [print(f" {c}: {len(audio_by_class[c])}") for c in FAULT_CLASSES.values()] |
| print("Images:"); [print(f" {c}: {len(image_by_class[c])}") for c in FAULT_CLASSES.values()] |
|
|
| all_samples = [] |
| for cn,cid in FAULT_NAME_TO_ID.items(): |
| au,im = audio_by_class[cn],image_by_class[cn] |
| if not au and not im:continue |
| n = min(max(len(au),len(im)),max_per_class) |
| for i in range(n): |
| src = "youtube" if au and "yt_" in au[i%len(au)] else "mixed" |
| all_samples.append({"fault_class":cid,"fault_name":cn,"audio_path":au[i%len(au)] if au else None,"image_path":im[i%len(im)] if im else None,"source":src}) |
| random.shuffle(all_samples) |
| print(f"\nTotal: {len(all_samples)}") |
| dist=Counter(s["fault_name"] for s in all_samples) |
| for c,n in sorted(dist.items()):print(f" {c}: {n}") |
|
|
| from datasets import Dataset,Audio,DatasetDict,ClassLabel |
| from datasets import Image as HFImage |
| data={"audio":[],"image":[],"fault_class":[],"fault_name":[],"source":[]} |
| for s in all_samples: |
| data["fault_class"].append(s["fault_class"]);data["fault_name"].append(s["fault_name"]) |
| data["source"].append(s["source"]);data["audio"].append(s.get("audio_path"));data["image"].append(s.get("image_path")) |
| ds=Dataset.from_dict(data) |
| ds=ds.cast_column("audio",Audio(sampling_rate=16000)) |
| ds=ds.cast_column("image",HFImage()) |
| ds=ds.cast_column("fault_class",ClassLabel(names=list(FAULT_CLASSES.values()))) |
| sp=ds.train_test_split(test_size=0.3,seed=42,stratify_by_column="fault_class") |
| tv=sp["test"].train_test_split(test_size=0.5,seed=42,stratify_by_column="fault_class") |
| final=DatasetDict({"train":sp["train"],"validation":tv["train"],"test":tv["test"]}) |
| print(f"Train:{len(final['train'])}, Val:{len(final['validation'])}, Test:{len(final['test'])}") |
| final.push_to_hub("Ellaft/pc-fault-real-dataset",private=False) |
| print("Uploaded to https://huggingface.co/datasets/Ellaft/pc-fault-real-dataset") |
|
|
|
|
| if __name__ == "__main__": |
| output_dir = Path("./dataset_extended") |
| output_dir.mkdir(exist_ok=True) |
| run_youtube_scraping(output_dir, max_videos_per_query=5) |
| download_cooling_fans(output_dir, max_per_class=500) |
| try: download_esc50(output_dir, max_per_class=80) |
| except Exception as e: print(f"ESC-50 skipped: {e}") |
| generate_synth_audio(output_dir, n=500) |
| generate_synth_images(output_dir, n=500) |
| build_and_upload(output_dir, max_per_class=500) |
|
|