multimodal-pc-fault-detector / data /extend_dataset.py
Ellaft's picture
Add extend_dataset.py: YouTube scraping + ESC-50 + expanded synthetic (run locally for YouTube)
8d385d0 verified
"""
Extended Dataset Builder — YouTube scraping + ESC-50 + expanded synthetic
=========================================================================
Run this on your LOCAL machine or Google Colab (YouTube is blocked in HF Spaces).
This extends the existing dataset at Ellaft/pc-fault-real-dataset with:
1. YouTube scraped audio + video frames (real PC fault sounds/screens)
2. ESC-50 environmental sounds (real recordings mapped to fault classes)
3. More synthetic data (500 per class instead of 300)
Usage:
pip install yt-dlp Pillow numpy scipy librosa soundfile datasets huggingface_hub imageio-ffmpeg scikit-learn
sudo apt install ffmpeg # needed for YouTube frame extraction + ESC-50 audio decoding
python extend_dataset.py # Full run with YouTube + everything
"""
import os, sys, json, random, glob, shutil, subprocess
import numpy as np
from pathlib import Path
from PIL import Image, ImageDraw
from collections import Counter, defaultdict
SAMPLE_RATE = 16000
AUDIO_DURATION = 5.0
IMAGE_SIZE = (224, 224)
FAULT_CLASSES = {0: "normal_operation", 1: "boot_failure", 2: "overheating_fan", 3: "storage_failure", 4: "system_crash"}
FAULT_NAME_TO_ID = {v: k for k, v in FAULT_CLASSES.items()}
YOUTUBE_QUERIES = {
"normal_operation": [
"quiet gaming PC idle fan noise ambient",
"silent PC build running quiet computer sound",
"computer fan white noise 1 hour",
],
"boot_failure": [
"BIOS beep codes sounds AMI Award",
"computer beep codes troubleshooting POST",
"PC won't boot beeping sound motherboard",
"UEFI boot failure beep codes",
],
"overheating_fan": [
"loud PC fan noise grinding bearing failure",
"CPU fan rattling noise overheating",
"laptop fan very loud overheating spinning fast",
"PC fan bearing failure wobble noise",
],
"storage_failure": [
"hard drive clicking noise dying HDD sound",
"HDD click of death failing hard drive",
"hard drive failure sounds different brands",
"hard disk clicking grinding seek failure",
],
"system_crash": [
"Windows blue screen of death BSOD live crash",
"computer crash freeze blue screen error",
"Windows 10 BSOD crash compilation",
"PC kernel panic crash screen recording",
],
}
def find_ffmpeg():
try:
r = subprocess.run(["ffmpeg", "-version"], capture_output=True, timeout=5)
if r.returncode == 0: return "ffmpeg"
except: pass
try:
import imageio_ffmpeg
return imageio_ffmpeg.get_ffmpeg_exe()
except: pass
return None
def scrape_youtube_class(fault_class, queries, output_dir, max_videos_per_query=5):
audio_dir = output_dir / "yt_audio" / fault_class
frames_dir = output_dir / "yt_frames" / fault_class
audio_dir.mkdir(parents=True, exist_ok=True)
frames_dir.mkdir(parents=True, exist_ok=True)
for q in queries:
print(f" [{fault_class}] Searching: '{q}'")
cmd = [
"yt-dlp", f"ytsearch{max_videos_per_query}:{q}",
"--extract-audio", "--audio-format", "wav", "--audio-quality", "0",
"--match-filter", "duration<180",
"--max-downloads", str(max_videos_per_query),
"--no-playlist", "--quiet", "--no-warnings",
"-o", str(audio_dir / "%(id)s.%(ext)s"),
]
try:
subprocess.run(cmd, timeout=300, capture_output=True)
except Exception as e:
print(f" audio error: {e}")
vid_cmd = [
"yt-dlp", f"ytsearch{max_videos_per_query}:{q}",
"--format", "worst[ext=mp4]/worst",
"--match-filter", "duration<180",
"--max-downloads", str(max_videos_per_query),
"--no-playlist", "--quiet", "--no-warnings",
"-o", str(frames_dir / "%(id)s.%(ext)s"),
]
try:
subprocess.run(vid_cmd, timeout=300, capture_output=True)
except Exception as e:
print(f" video error: {e}")
ffmpeg_bin = find_ffmpeg()
if ffmpeg_bin:
for vf in list(frames_dir.glob("*.mp4")) + list(frames_dir.glob("*.webm")):
try:
subprocess.run([ffmpeg_bin, "-i", str(vf), "-vf", "fps=0.5,scale=224:224",
"-q:v", "2", "-y", "-loglevel", "error",
str(frames_dir / f"{vf.stem}_frame_%04d.jpg")], timeout=60, capture_output=True)
except: pass
vf.unlink(missing_ok=True)
n_audio = len(list(audio_dir.glob("*.wav")))
n_frames = len(list(frames_dir.glob("*.jpg")))
print(f" [{fault_class}] got {n_audio} audio, {n_frames} frames")
return n_audio, n_frames
def run_youtube_scraping(output_dir, max_videos_per_query=5):
print("\n" + "="*60 + "\nYOUTUBE SCRAPING\n" + "="*60)
for cls, queries in YOUTUBE_QUERIES.items():
scrape_youtube_class(cls, queries, output_dir, max_videos_per_query)
ESC50_MAPPING = {
"vacuum_cleaner": "overheating_fan", "engine": "overheating_fan", "washing_machine": "overheating_fan",
"clock_alarm": "boot_failure", "siren": "boot_failure",
"clock_tick": "storage_failure", "door_wood_knock": "storage_failure", "hand_saw": "storage_failure",
"glass_breaking": "system_crash", "fireworks": "system_crash", "chainsaw": "system_crash",
"keyboard_typing": "normal_operation", "mouse_click": "normal_operation",
}
def download_esc50(output_dir, max_per_class=80):
print("\n" + "="*60 + "\nESC-50 DATASET\n" + "="*60)
import soundfile as sf
from datasets import load_dataset
audio_dir = output_dir / "esc50_audio"
audio_dir.mkdir(parents=True, exist_ok=True)
try:
ds = load_dataset("ashraq/esc50", split="train")
print(f" Loaded {len(ds)} samples")
except Exception as e:
print(f" Failed: {e}"); return {}
counts = defaultdict(int)
for sample in ds:
cat = sample["category"]
if cat not in ESC50_MAPPING: continue
fc = ESC50_MAPPING[cat]
if counts[fc] >= max_per_class: continue
audio = sample["audio"]
arr = np.array(audio["array"], dtype=np.float32)
sf.write(str(audio_dir / f"{fc}_esc50_{counts[fc]:04d}.wav"), arr, audio["sampling_rate"])
counts[fc] += 1
for fc, c in sorted(counts.items()): print(f" {fc}: {c}")
return dict(counts)
def download_cooling_fans(output_dir, max_per_class=500):
print("\n" + "="*60 + "\nCOOLING FANS\n" + "="*60)
audio_dir = output_dir / "hf_audio"
audio_dir.mkdir(parents=True, exist_ok=True)
try:
from huggingface_hub import snapshot_download
path = snapshot_download("HenriqueFrancaa/cooling-fans-db0", repo_type="dataset")
wav_files = glob.glob(os.path.join(path, "**/*.wav"), recursive=True)
normal_c, abnormal_c = 0, 0
for wf in wav_files:
pl = wf.lower()
if "abnormal" in pl and abnormal_c < max_per_class:
shutil.copy2(wf, audio_dir / f"overheating_fan_cfan_{abnormal_c:04d}.wav"); abnormal_c += 1
elif "normal" in pl and normal_c < max_per_class:
shutil.copy2(wf, audio_dir / f"normal_operation_cfan_{normal_c:04d}.wav"); normal_c += 1
print(f" {normal_c} normal, {abnormal_c} abnormal")
except Exception as e:
print(f" Failed: {e}")
def _synthesize_beep_pattern(pattern, freq=1000, jitter=0.1):
audio = []
for dur, is_beep in pattern:
actual = dur * (1 + random.uniform(-jitter, jitter))
n = int(SAMPLE_RATE * actual / 1000)
if is_beep:
t = np.linspace(0, actual/1000, n)
b = 0.7*(0.5*np.sign(np.sin(2*np.pi*freq*t))) + 0.3*(0.5*np.sin(2*np.pi*freq*t))
env = np.ones(n); a = min(200, n//4)
env[:a] = np.linspace(0,1,a); env[-a:] = np.linspace(1,0,a)
audio.append((b*env).astype(np.float32))
else:
audio.append(np.zeros(n, dtype=np.float32))
r = np.concatenate(audio); tgt = int(SAMPLE_RATE*AUDIO_DURATION)
if len(r)<tgt: r = np.tile(r, tgt//len(r)+1)[:tgt]
else: r = r[:tgt]
return r
def generate_synth_audio(output_dir, n=500):
print("\n" + "="*60 + "\nSYNTHETIC AUDIO\n" + "="*60)
import scipy.io.wavfile as wav
d = output_dir / "synth_audio"; d.mkdir(parents=True, exist_ok=True)
tgt = int(SAMPLE_RATE*AUDIO_DURATION)
PATS = {"a":[(800,True),(200,False),(200,True),(200,False)], "b":[(200,True),(100,False)]*3,
"c":[(200,True),(100,False)]*5, "d":[(800,True),(200,False),(800,True),(200,False),(200,True)],
"e":[(800,True),(300,False)], "f":[(200,True),(300,False),(200,True),(300,False),(200,True),(500,False),(200,True)]}
c = 0
for _,p in PATS.items():
for i in range(n//len(PATS)):
a = np.clip(_synthesize_beep_pattern(p,random.uniform(800,1200),0.15)+np.random.randn(tgt)*random.uniform(0.005,0.02),-1,1)
wav.write(str(d/f"boot_failure_beep_{c:04d}.wav"),SAMPLE_RATE,(a*32767).astype(np.int16)); c+=1
for i in range(n//10):
a = np.clip(_synthesize_beep_pattern([(200,True),(300,False)],random.uniform(900,1100),0.1)+np.random.randn(tgt)*0.005,-1,1)
wav.write(str(d/f"normal_operation_beep_{i:04d}.wav"),SAMPLE_RATE,(a*32767).astype(np.int16))
for i in range(n):
t = random.choice(["burst","glitch","hang","feedback"])
if t=="burst": a=np.zeros(tgt,np.float32);s=random.randint(0,tgt//2);bl=min(random.randint(SAMPLE_RATE//4,SAMPLE_RATE*2),tgt-s);a[s:s+bl]=(np.random.randn(bl)*random.uniform(0.3,0.8)*np.exp(-np.linspace(0,3,bl))).astype(np.float32)
elif t=="glitch": ch=np.random.randn(random.randint(50,500)).astype(np.float32)*0.3;a=np.tile(ch,tgt//len(ch)+1)[:tgt]
elif t=="hang": a=np.random.randn(tgt).astype(np.float32)*0.01;cc=random.randint(SAMPLE_RATE//4,SAMPLE_RATE);a[:cc]=np.random.randn(cc)*0.2
else: f=random.uniform(200,2000);tt=np.linspace(0,AUDIO_DURATION,tgt);a=(0.4*np.sin(2*np.pi*f*tt)*np.linspace(0.1,1,tgt)).astype(np.float32)
wav.write(str(d/f"system_crash_synth_{i:04d}.wav"),SAMPLE_RATE,(np.clip(a,-1,1)*32767).astype(np.int16))
for i in range(n):
a=np.zeros(tgt,np.float32);hf=random.uniform(40,80);tt=np.linspace(0,AUDIO_DURATION,tgt)
a+=random.uniform(0.02,0.08)*np.sin(2*np.pi*hf*tt)
ci=random.uniform(0.3,1.5);nc=int(AUDIO_DURATION/ci);cd=int(SAMPLE_RATE*random.uniform(0.005,0.02))
for j in range(nc):
p=max(0,min(int(j*ci*SAMPLE_RATE)+random.randint(-100,100),tgt-cd))
a[p:p+cd]+=(np.random.randn(cd)*random.uniform(0.2,0.6)*np.exp(-np.linspace(0,8,cd))).astype(np.float32)
wav.write(str(d/f"storage_failure_hdd_{i:04d}.wav"),SAMPLE_RATE,(np.clip(a,-1,1)*32767).astype(np.int16))
print(f" Done: {c} beeps, {n//10} normal, {n} crash, {n} HDD")
def generate_synth_images(output_dir, n=500):
print("\n" + "="*60 + "\nSYNTHETIC IMAGES\n" + "="*60)
BSOD=[("0x0000007E","SYSTEM_THREAD_EXCEPTION_NOT_HANDLED"),("0x0000003B","SYSTEM_SERVICE_EXCEPTION"),("0x000000EF","CRITICAL_PROCESS_DIED"),("0x00000133","DPC_WATCHDOG_VIOLATION")]
POST=["ERROR: Boot device not found","CMOS checksum error","NTLDR is missing","DISK BOOT FAILURE","Reboot and Select proper Boot device"]
SMART=["SMART Failure Predicted","Reallocated Sector Count: 1624 (CRITICAL)","Windows found 208 bad sectors","Disk read error occurred"]
for cls,fn in [("system_crash",lambda:_bsod(BSOD)),("boot_failure",lambda:_post(POST)),("overheating_fan",lambda:_therm()),("storage_failure",lambda:_stor(SMART)),("normal_operation",lambda:_norm())]:
dd=output_dir/"synth_images"/cls;dd.mkdir(parents=True,exist_ok=True)
for i in range(n): fn().resize(IMAGE_SIZE,Image.LANCZOS).save(str(dd/f"{cls}_{i:04d}.jpg"),quality=random.randint(80,95))
print(f" {cls}: {n}")
def _bsod(E):
w,h=1920,1080;c,m=random.choice(E);img=Image.new("RGB",(w,h),random.choice(["#0078D7","#000080"]));d=ImageDraw.Draw(img)
d.text((int(w*.05),int(h*.08)),":(",fill="white");d.text((int(w*.05),int(h*.25)),"Your PC ran into a problem.",fill="white")
d.text((int(w*.05),int(h*.35)),f"{random.randint(0,100)}% complete",fill="white");d.text((int(w*.15),int(h*.72)),f"Stop code: {m}",fill="white");return img
def _post(E):
w,h=random.choice([(1920,1080),(1024,768)]);img=Image.new("RGB",(w,h),"#000000");d=ImageDraw.Draw(img)
d.text((20,20),random.choice(["AMI BIOS","Award BIOS","Phoenix BIOS"]),fill="#AAAAAA")
d.text((20,50),"CPU: Intel Core i7-12700K",fill="#AAFFAA");d.text((20,80),f"Memory Test: {random.choice([8192,16384,32768])}MB OK",fill="#AAFFAA")
d.text((20,140),random.choice(E),fill="#FF0000");d.text((20,180),"Press F1 to Resume",fill="#FFFFFF");return img
def _therm():
w,h=1920,1080;s=random.choice(["hw","bios","pop"])
if s=="hw":img=Image.new("RGB",(w,h),"#F0F0F0");d=ImageDraw.Draw(img);d.rectangle([0,0,w,40],fill="#0078D4");d.text((10,10),"HW Monitor - CRITICAL",fill="white");y=60;[((d.text((20,y+i*35),f"{n}: {random.randint(88,110)}°C [CRITICAL]",fill="#FF0000"))) for i,n in enumerate(["CPU","GPU","VRM"])]
elif s=="bios":img=Image.new("RGB",(w,h),"#000000");d=ImageDraw.Draw(img);d.text((w//4,h//4),"*** WARNING ***",fill="#FF0000");d.text((w//4,h//4+40),f"CPU: {random.randint(95,110)}°C",fill="#FF0000");d.text((w//4,h//4+80),"System shutting down.",fill="#FFAA00")
else:img=Image.new("RGB",(w,h),"#1E1E1E");d=ImageDraw.Draw(img);px=w-420;py=h-210;d.rectangle([px,py,px+400,py+150],fill="#2D2D2D",outline="#FF6600",width=2);d.text((px+15,py+10),"Critical Temperature",fill="#FF6600");d.text((px+15,py+40),f"CPU: {random.randint(90,105)}°C",fill="#FF0000")
return img
def _stor(E):
w,h=random.choice([(1920,1080),(1280,1024)]);img=Image.new("RGB",(w,h),"#000000");d=ImageDraw.Draw(img)
s=random.choice(["smart","chkdsk"])
if s=="smart":d.text((w//6,h//3),"WARNING:",fill="#FFAA00");d.text((w//6,h//3+40),random.choice(E),fill="#FF0000");d.text((w//6,h//3+80),"Back up immediately.",fill="#FFFFFF")
else:d.text((20,20),"Checking file system on C:",fill="#CCCCCC");d.text((20,60),f"CHKDSK stage {random.randint(1,5)}/5... {random.randint(5,95)}%",fill="#FFFFFF");d.text((20,100),f"Found {random.randint(1,500)} bad sectors.",fill="#FF4444")
return img
def _norm():
w,h=1920,1080;img=Image.new("RGB",(w,h),random.choice(["#0078D4","#1B5E20","#283593","#004D40","#263238"]));d=ImageDraw.Draw(img)
d.rectangle([0,h-48,w,h],fill="#1F1F1F");d.text((w-100,h-35),f"{random.randint(1,12):02d}:{random.randint(0,59):02d} PM",fill="#FFFFFF");return img
def build_and_upload(output_dir, max_per_class=500):
print("\n" + "="*60 + "\nBUILDING + UPLOADING\n" + "="*60)
audio_by_class = defaultdict(list)
for sub in ["synth_audio","hf_audio","esc50_audio"]:
dd=output_dir/sub
if not dd.exists():continue
for f in dd.rglob("*.wav"):
for cn in FAULT_NAME_TO_ID:
if f.stem.lower().startswith(cn):audio_by_class[cn].append(str(f));break
yt=output_dir/"yt_audio"
if yt.exists():
for cd in yt.iterdir():
if cd.is_dir() and cd.name in FAULT_NAME_TO_ID:
for f in cd.glob("*.wav"):audio_by_class[cd.name].append(str(f))
image_by_class = defaultdict(list)
for cd in (output_dir/"synth_images").iterdir():
if cd.is_dir() and cd.name in FAULT_NAME_TO_ID:
for f in cd.glob("*.jpg"):image_by_class[cd.name].append(str(f))
ytf=output_dir/"yt_frames"
if ytf.exists():
for cd in ytf.iterdir():
if cd.is_dir() and cd.name in FAULT_NAME_TO_ID:
for f in cd.glob("*.jpg"):image_by_class[cd.name].append(str(f))
print("Audio:"); [print(f" {c}: {len(audio_by_class[c])}") for c in FAULT_CLASSES.values()]
print("Images:"); [print(f" {c}: {len(image_by_class[c])}") for c in FAULT_CLASSES.values()]
all_samples = []
for cn,cid in FAULT_NAME_TO_ID.items():
au,im = audio_by_class[cn],image_by_class[cn]
if not au and not im:continue
n = min(max(len(au),len(im)),max_per_class)
for i in range(n):
src = "youtube" if au and "yt_" in au[i%len(au)] else "mixed"
all_samples.append({"fault_class":cid,"fault_name":cn,"audio_path":au[i%len(au)] if au else None,"image_path":im[i%len(im)] if im else None,"source":src})
random.shuffle(all_samples)
print(f"\nTotal: {len(all_samples)}")
dist=Counter(s["fault_name"] for s in all_samples)
for c,n in sorted(dist.items()):print(f" {c}: {n}")
from datasets import Dataset,Audio,DatasetDict,ClassLabel
from datasets import Image as HFImage
data={"audio":[],"image":[],"fault_class":[],"fault_name":[],"source":[]}
for s in all_samples:
data["fault_class"].append(s["fault_class"]);data["fault_name"].append(s["fault_name"])
data["source"].append(s["source"]);data["audio"].append(s.get("audio_path"));data["image"].append(s.get("image_path"))
ds=Dataset.from_dict(data)
ds=ds.cast_column("audio",Audio(sampling_rate=16000))
ds=ds.cast_column("image",HFImage())
ds=ds.cast_column("fault_class",ClassLabel(names=list(FAULT_CLASSES.values())))
sp=ds.train_test_split(test_size=0.3,seed=42,stratify_by_column="fault_class")
tv=sp["test"].train_test_split(test_size=0.5,seed=42,stratify_by_column="fault_class")
final=DatasetDict({"train":sp["train"],"validation":tv["train"],"test":tv["test"]})
print(f"Train:{len(final['train'])}, Val:{len(final['validation'])}, Test:{len(final['test'])}")
final.push_to_hub("Ellaft/pc-fault-real-dataset",private=False)
print("Uploaded to https://huggingface.co/datasets/Ellaft/pc-fault-real-dataset")
if __name__ == "__main__":
output_dir = Path("./dataset_extended")
output_dir.mkdir(exist_ok=True)
run_youtube_scraping(output_dir, max_videos_per_query=5)
download_cooling_fans(output_dir, max_per_class=500)
try: download_esc50(output_dir, max_per_class=80)
except Exception as e: print(f"ESC-50 skipped: {e}")
generate_synth_audio(output_dir, n=500)
generate_synth_images(output_dir, n=500)
build_and_upload(output_dir, max_per_class=500)