""" PC Fault Detection — Complete Dataset Builder =============================================== Builds a real multimodal dataset from 4 sources: 1. YouTube scraping (audio + video frames) 2. HuggingFace datasets (cooling-fans, FSD50K) 3. Synthetic audio generation (beep codes, fan noise augmentation) 4. Synthetic visual generation (BSOD, POST screens, SMART errors, thermal warnings) NO API KEYS NEEDED — everything uses public tools. Output: HuggingFace dataset at Ellaft/pc-fault-real-dataset Columns: audio (Audio), image (Image), fault_class (int), fault_name (str), source (str) Usage: python build_dataset.py # Build everything python build_dataset.py --skip_youtube # Skip YouTube (slow), use synth+HF only python build_dataset.py --youtube_only # Only scrape YouTube python build_dataset.py --upload # Upload to HuggingFace Hub python build_dataset.py --max_per_class 200 # Limit samples per class Requirements: pip install -r requirements_data.txt # Also needs ffmpeg for video frame extraction: # Ubuntu: sudo apt install ffmpeg # Mac: brew install ffmpeg # Or: pip install imageio-ffmpeg (Python-only fallback) """ import os, sys, json, random, glob, shutil, argparse, subprocess, re import numpy as np from pathlib import Path from PIL import Image, ImageDraw, ImageFont from collections import Counter, defaultdict # ============================================================================ # Constants # ============================================================================ FAULT_CLASSES = { 0: "normal_operation", 1: "boot_failure", 2: "overheating_fan", 3: "storage_failure", 4: "system_crash", } FAULT_NAME_TO_ID = {v: k for k, v in FAULT_CLASSES.items()} DATA_ROOT = Path("./dataset_build") AUDIO_DIR = DATA_ROOT / "audio" IMAGE_DIR = DATA_ROOT / "image" FINAL_DIR = DATA_ROOT / "final" SAMPLE_RATE = 16000 AUDIO_DURATION = 5.0 # seconds IMAGE_SIZE = (224, 224) # ============================================================================ # PART 1: YouTube Scraper # ============================================================================ # Curated YouTube search queries per fault class YOUTUBE_QUERIES = { "normal_operation": [ "quiet gaming PC idle fan noise ambient", "silent PC build running quiet computer sound", "computer fan white noise sleep 1 hour", "desktop PC running normally ambient sound", ], "boot_failure": [ "BIOS beep codes explained AMI Award", "computer beep codes troubleshooting POST", "motherboard beep code 3 long beeps", "PC won't boot beeping sound BIOS", "UEFI boot failure no display beep", ], "overheating_fan": [ "loud PC fan noise grinding bearing failure", "CPU fan rattling noise overheating computer", "graphics card fan loud grinding noise", "laptop fan very loud overheating spinning", "PC fan bearing failure wobble noise repair", ], "storage_failure": [ "hard drive clicking noise dying HDD sound", "HDD click of death failing hard drive", "hard drive failure sounds different brands", "SSD failure symptoms clicking grinding noise", "hard disk bad sector read error sound", ], "system_crash": [ "Windows blue screen of death BSOD live", "computer crash freeze blue screen error", "Windows 10 11 BSOD crash stop code", "kernel panic Linux system crash", "PC randomly crashes blue screen gaming", ], } def _find_ffmpeg(): """Find ffmpeg binary — system or Python package.""" # Try system ffmpeg first try: result = subprocess.run(["ffmpeg", "-version"], capture_output=True, timeout=5) if result.returncode == 0: return "ffmpeg" except (FileNotFoundError, subprocess.TimeoutExpired): pass # Try Python imageio-ffmpeg try: import imageio_ffmpeg return imageio_ffmpeg.get_ffmpeg_exe() except ImportError: pass return None def scrape_youtube(fault_class, query, output_dir, max_videos=5, max_duration=120): """ Scrape YouTube: download audio as WAV + extract video frames. Uses yt-dlp (no API key needed, no login needed). """ audio_out = output_dir / "audio" / fault_class frames_out = output_dir / "frames" / fault_class audio_out.mkdir(parents=True, exist_ok=True) frames_out.mkdir(parents=True, exist_ok=True) print(f" Searching: '{query}' (max {max_videos} videos)...") # Step 1: Download audio audio_cmd = [ "yt-dlp", f"ytsearch{max_videos}:{query}", "--extract-audio", "--audio-format", "wav", "--audio-quality", "0", "--match-filter", f"duration<{max_duration}", "--max-downloads", str(max_videos), "--no-playlist", "--quiet", "--no-warnings", "-o", str(audio_out / "%(id)s.%(ext)s"), ] # Step 2: Download video for frame extraction video_cmd = [ "yt-dlp", f"ytsearch{max_videos}:{query}", "--format", "worst[ext=mp4]", "--match-filter", f"duration<{max_duration}", "--max-downloads", str(max_videos), "--no-playlist", "--quiet", "--no-warnings", "-o", str(frames_out / "%(id)s.%(ext)s"), ] try: subprocess.run(audio_cmd, timeout=180, capture_output=True) except (subprocess.TimeoutExpired, Exception) as e: print(f" ⚠ Audio download timeout/error: {e}") try: subprocess.run(video_cmd, timeout=180, capture_output=True) except (subprocess.TimeoutExpired, Exception) as e: print(f" ⚠ Video download timeout/error: {e}") # Step 3: Extract frames using ffmpeg ffmpeg_bin = _find_ffmpeg() if ffmpeg_bin: video_files = list(frames_out.glob("*.mp4")) + list(frames_out.glob("*.webm")) for vf in video_files: frame_prefix = frames_out / f"{vf.stem}_frame" ffmpeg_cmd = [ ffmpeg_bin, "-i", str(vf), "-vf", "fps=0.5,scale=224:224", "-q:v", "2", "-y", "-loglevel", "error", str(frame_prefix) + "_%04d.jpg", ] try: subprocess.run(ffmpeg_cmd, timeout=60, capture_output=True) except Exception: pass vf.unlink(missing_ok=True) else: print(" ⚠ ffmpeg not found, skipping frame extraction") print(" Install: sudo apt install ffmpeg OR pip install imageio-ffmpeg") n_audio = len(list(audio_out.glob("*.wav"))) n_frames = len(list(frames_out.glob("*.jpg"))) print(f" ✓ {n_audio} audio files, {n_frames} frames") return n_audio, n_frames def run_youtube_scraper(output_dir, max_videos_per_query=3): """Scrape YouTube for all fault classes.""" print("\n" + "="*60) print("PART 1: YouTube Scraping") print("="*60) print("(No API key needed — uses yt-dlp public scraping)") print("(If blocked, run on your local machine or Google Colab)") stats = {} for fault_class, queries in YOUTUBE_QUERIES.items(): print(f"\n[{fault_class}]") total_audio, total_frames = 0, 0 for q in queries: na, nf = scrape_youtube( fault_class, q, output_dir, max_videos=max_videos_per_query, max_duration=120) total_audio += na total_frames += nf stats[fault_class] = {"audio": total_audio, "frames": total_frames} print(f" Total: {total_audio} audio, {total_frames} frames") return stats # ============================================================================ # PART 2: HuggingFace Dataset Downloaders # ============================================================================ def download_cooling_fans(output_dir, max_per_class=200): """ Download HenriqueFrancaa/cooling-fans-db0 — real PC cooling fan recordings. Normal → normal_operation, Abnormal → overheating_fan """ print("\n" + "="*60) print("PART 2a: Cooling Fans Dataset (HuggingFace)") print("="*60) from datasets import load_dataset import soundfile as sf audio_out = output_dir / "hf_audio" audio_out.mkdir(parents=True, exist_ok=True) try: ds = load_dataset("HenriqueFrancaa/cooling-fans-db0", split="train") print(f" Loaded {len(ds)} samples") except Exception as e: print(f" ⚠ Failed to load cooling-fans: {e}") try: from huggingface_hub import snapshot_download path = snapshot_download("HenriqueFrancaa/cooling-fans-db0", repo_type="dataset") wav_files = glob.glob(os.path.join(path, "**/*.wav"), recursive=True) print(f" Found {len(wav_files)} WAV files via snapshot") normal_count, abnormal_count = 0, 0 for wf in wav_files: path_lower = wf.lower() if "abnormal" in path_lower and abnormal_count < max_per_class: out_path = audio_out / f"overheating_fan_cfan_{abnormal_count:04d}.wav" shutil.copy2(wf, out_path) abnormal_count += 1 elif "normal" in path_lower and normal_count < max_per_class: out_path = audio_out / f"normal_operation_cfan_{normal_count:04d}.wav" shutil.copy2(wf, out_path) normal_count += 1 print(f" ✓ Saved {normal_count} normal, {abnormal_count} abnormal fan clips") return {"normal_operation": normal_count, "overheating_fan": abnormal_count} except Exception as e2: print(f" ✗ Snapshot also failed: {e2}") return {} normal_count, abnormal_count = 0, 0 for i, sample in enumerate(ds): audio = sample["audio"] arr = np.array(audio["array"], dtype=np.float32) sr = audio["sampling_rate"] if i % 2 == 0 and normal_count < max_per_class: out_path = audio_out / f"normal_operation_cfan_{normal_count:04d}.wav" sf.write(str(out_path), arr, sr) normal_count += 1 elif abnormal_count < max_per_class: out_path = audio_out / f"overheating_fan_cfan_{abnormal_count:04d}.wav" sf.write(str(out_path), arr, sr) abnormal_count += 1 if normal_count >= max_per_class and abnormal_count >= max_per_class: break print(f" ✓ Saved {normal_count} normal, {abnormal_count} abnormal fan clips") return {"normal_operation": normal_count, "overheating_fan": abnormal_count} def download_fsd50k_relevant(output_dir, max_per_class=100): """Download relevant sound events from FSD50K.""" print("\n" + "="*60) print("PART 2b: FSD50K Sound Events (HuggingFace)") print("="*60) from datasets import load_dataset import soundfile as sf audio_out = output_dir / "hf_audio" audio_out.mkdir(parents=True, exist_ok=True) LABEL_MAP = { "overheating_fan": ["fan", "mechanical_fan", "whir", "buzz", "hum", "engine"], "boot_failure": ["beep", "bleep", "alarm", "buzzer", "siren"], "storage_failure": ["click", "tick", "ratchet", "mechanical"], "normal_operation": ["keyboard", "typing", "mouse", "computer_keyboard"], } try: ds = load_dataset("Fhrozen/FSD50k", split="validation") print(f" Loaded {len(ds)} samples from FSD50K validation split") except Exception as e: print(f" ⚠ Failed to load FSD50K: {e}") return {} counts = defaultdict(int) for sample in ds: label = str(sample.get("label", "")).lower() if not label: continue for fault_class, keywords in LABEL_MAP.items(): if any(kw in label for kw in keywords) and counts[fault_class] < max_per_class: audio = sample["audio"] arr = np.array(audio["array"], dtype=np.float32) sr = audio["sampling_rate"] idx = counts[fault_class] out_path = audio_out / f"{fault_class}_fsd_{idx:04d}.wav" sf.write(str(out_path), arr, sr) counts[fault_class] += 1 break for fc, c in counts.items(): print(f" ✓ {fc}: {c} clips from FSD50K") return dict(counts) # ============================================================================ # PART 3: Synthetic Audio Generation # ============================================================================ def _synthesize_beep_pattern(pattern, freq=1000, jitter=0.1): """Synthesize a beep code pattern with timing jitter.""" audio = [] for duration_ms, is_beep in pattern: actual_duration = duration_ms * (1 + random.uniform(-jitter, jitter)) n_samples = int(SAMPLE_RATE * actual_duration / 1000) if is_beep: t = np.linspace(0, actual_duration / 1000, n_samples) beep = 0.5 * np.sign(np.sin(2 * np.pi * freq * t)) beep = 0.7 * beep + 0.3 * (0.5 * np.sin(2 * np.pi * freq * t)) envelope = np.ones(n_samples) attack = min(200, n_samples // 4) envelope[:attack] = np.linspace(0, 1, attack) envelope[-attack:] = np.linspace(1, 0, attack) beep = beep * envelope audio.append(beep) else: audio.append(np.zeros(n_samples)) result = np.concatenate(audio) target_len = int(SAMPLE_RATE * AUDIO_DURATION) if len(result) < target_len: repeats = target_len // len(result) + 1 result = np.tile(result, repeats)[:target_len] else: result = result[:target_len] return result.astype(np.float32) def generate_beep_codes(output_dir, n_per_pattern=30): """Generate BIOS POST beep code audio with real AMI/Award/Phoenix patterns.""" print("\n" + "="*60) print("PART 3a: Synthetic BIOS Beep Codes") print("="*60) import scipy.io.wavfile as wav audio_out = output_dir / "synth_audio" audio_out.mkdir(parents=True, exist_ok=True) BEEP_PATTERNS = { "ami_dram_refresh": [(800, True), (200, False), (200, True), (200, False)], "ami_memory_parity": [(200, True), (100, False)] * 3, "ami_base_memory": [(200, True), (100, False)] * 4, "ami_timer_failure": [(200, True), (100, False)] * 5, "ami_cpu_failure": [(200, True), (100, False)] * 6, "ami_keyboard_ctrl": [(200, True), (100, False)] * 7, "ami_video_failure": [(800, True), (200, False), (800, True), (200, False), (200, True)], "award_general_failure": [(800, True), (300, False)] * 1, "award_video_error": [(800, True), (200, False), (200, True), (200, False), (200, True)], "award_no_video": [(800, True), (200, False)] * 2, "phoenix_cpu_error": [(200, True), (300, False), (200, True), (300, False), (200, True), (500, False), (200, True), (300, False), (200, True)], "phoenix_memory_error": [(200, True), (300, False), (200, True), (500, False), (200, True), (300, False), (200, True), (300, False), (200, True)], } NORMAL_BEEP = [(200, True), (300, False)] count = 0 for pattern_name, pattern in BEEP_PATTERNS.items(): for i in range(n_per_pattern): freq = random.uniform(800, 1200) audio = _synthesize_beep_pattern(pattern, freq, jitter=0.15) noise = np.random.randn(len(audio)) * random.uniform(0.005, 0.02) audio = np.clip(audio + noise, -1, 1) out_path = audio_out / f"boot_failure_beep_{count:04d}.wav" wav.write(str(out_path), SAMPLE_RATE, (audio * 32767).astype(np.int16)) count += 1 for i in range(n_per_pattern): freq = random.uniform(900, 1100) audio = _synthesize_beep_pattern(NORMAL_BEEP, freq, jitter=0.1) noise = np.random.randn(len(audio)) * 0.005 audio = np.clip(audio + noise, -1, 1) out_path = audio_out / f"normal_operation_beep_{i:04d}.wav" wav.write(str(out_path), SAMPLE_RATE, (audio * 32767).astype(np.int16)) print(f" ✓ Generated {count} boot_failure beep codes + {n_per_pattern} normal POST beeps") return count def generate_crash_audio(output_dir, n_samples=100): """Generate system crash audio: noise bursts, glitches, hangs, feedback loops.""" print("\n" + "="*60) print("PART 3b: Synthetic System Crash Audio") print("="*60) import scipy.io.wavfile as wav audio_out = output_dir / "synth_audio" audio_out.mkdir(parents=True, exist_ok=True) count = 0 target_len = int(SAMPLE_RATE * AUDIO_DURATION) for i in range(n_samples): crash_type = random.choice(["noise_burst", "glitch", "hang", "feedback"]) if crash_type == "noise_burst": audio = np.zeros(target_len, dtype=np.float32) burst_start = random.randint(0, target_len // 2) burst_len = random.randint(SAMPLE_RATE // 4, SAMPLE_RATE * 2) burst = np.random.randn(min(burst_len, target_len - burst_start)).astype(np.float32) burst *= random.uniform(0.3, 0.8) decay = np.exp(-np.linspace(0, 3, len(burst))) audio[burst_start:burst_start + len(burst)] = burst * decay elif crash_type == "glitch": chunk_len = random.randint(50, 500) chunk = np.random.randn(chunk_len).astype(np.float32) * 0.3 audio = np.tile(chunk, target_len // chunk_len + 1)[:target_len] for _ in range(random.randint(1, 5)): start = random.randint(0, target_len - SAMPLE_RATE) duration = random.randint(SAMPLE_RATE // 10, SAMPLE_RATE) audio[start:start + duration] = 0 elif crash_type == "hang": audio = np.random.randn(target_len).astype(np.float32) * 0.01 cutoff = random.randint(SAMPLE_RATE // 4, SAMPLE_RATE) audio[:cutoff] = np.random.randn(cutoff) * 0.2 else: # feedback freq = random.uniform(200, 2000) t = np.linspace(0, AUDIO_DURATION, target_len) audio = 0.4 * np.sin(2 * np.pi * freq * t) audio *= np.linspace(0.1, 1.0, target_len) audio += 0.2 * np.sin(2 * np.pi * freq * 2 * t) audio = audio.astype(np.float32) audio = np.clip(audio, -1, 1) out_path = audio_out / f"system_crash_synth_{count:04d}.wav" wav.write(str(out_path), SAMPLE_RATE, (audio * 32767).astype(np.int16)) count += 1 print(f" ✓ Generated {count} system_crash audio samples") return count def generate_hdd_click_audio(output_dir, n_samples=100): """Generate HDD clicking/grinding sounds for storage_failure.""" print("\n" + "="*60) print("PART 3c: Synthetic HDD Click Audio") print("="*60) import scipy.io.wavfile as wav audio_out = output_dir / "synth_audio" audio_out.mkdir(parents=True, exist_ok=True) count = 0 target_len = int(SAMPLE_RATE * AUDIO_DURATION) for i in range(n_samples): audio = np.zeros(target_len, dtype=np.float32) # Background motor hum hum_freq = random.uniform(40, 80) t = np.linspace(0, AUDIO_DURATION, target_len) audio += random.uniform(0.02, 0.08) * np.sin(2 * np.pi * hum_freq * t) # Repetitive clicks click_interval = random.uniform(0.3, 1.5) n_clicks = int(AUDIO_DURATION / click_interval) click_duration = int(SAMPLE_RATE * random.uniform(0.005, 0.02)) for j in range(n_clicks): pos = int(j * click_interval * SAMPLE_RATE) pos += random.randint(-100, 100) pos = max(0, min(pos, target_len - click_duration)) click = np.random.randn(click_duration) * random.uniform(0.2, 0.6) click *= np.exp(-np.linspace(0, 8, click_duration)) audio[pos:pos + click_duration] += click.astype(np.float32) # Sometimes add grinding noise if random.random() < 0.3: grind_start = random.randint(0, target_len // 2) grind_len = random.randint(SAMPLE_RATE // 2, SAMPLE_RATE * 2) grind_len = min(grind_len, target_len - grind_start) grind = np.random.randn(grind_len) * 0.15 grind_freq = random.uniform(500, 3000) t_grind = np.linspace(0, grind_len / SAMPLE_RATE, grind_len) grind *= (1 + 0.5 * np.sin(2 * np.pi * grind_freq * t_grind)) audio[grind_start:grind_start + grind_len] += grind.astype(np.float32) audio = np.clip(audio, -1, 1) out_path = audio_out / f"storage_failure_hdd_{count:04d}.wav" wav.write(str(out_path), SAMPLE_RATE, (audio * 32767).astype(np.int16)) count += 1 print(f" ✓ Generated {count} storage_failure HDD click audio samples") return count # ============================================================================ # PART 4: Synthetic Visual Generation # ============================================================================ BSOD_ERRORS = [ ("0x0000007E", "SYSTEM_THREAD_EXCEPTION_NOT_HANDLED"), ("0x0000003B", "SYSTEM_SERVICE_EXCEPTION"), ("0x00000050", "PAGE_FAULT_IN_NONPAGED_AREA"), ("0x0000001A", "MEMORY_MANAGEMENT"), ("0x000000EF", "CRITICAL_PROCESS_DIED"), ("0xC0000005", "KERNEL_SECURITY_CHECK_FAILURE"), ("0x00000133", "DPC_WATCHDOG_VIOLATION"), ("0x000000D1", "DRIVER_IRQL_NOT_LESS_OR_EQUAL"), ("0x0000007F", "UNEXPECTED_KERNEL_MODE_TRAP"), ("0x00000124", "WHEA_UNCORRECTABLE_ERROR"), ("0x0000000A", "IRQL_NOT_LESS_OR_EQUAL"), ("0x0000001E", "KMODE_EXCEPTION_NOT_HANDLED"), ("0x000000C5", "DRIVER_CORRUPTED_EXPOOL"), ("0x00000019", "BAD_POOL_HEADER"), ("0x00000139", "KERNEL_SECURITY_CHECK_FAILURE"), ] POST_ERRORS = [ "ERROR: Boot device not found. Press F1 to continue...", "CMOS checksum error - Defaults loaded", "Keyboard error or no keyboard present", "NTLDR is missing. Press Ctrl+Alt+Del to restart", "DISK BOOT FAILURE, INSERT SYSTEM DISK AND PRESS ENTER", "Reboot and Select proper Boot device", "No boot device available - strike F1 to retry boot", "ERROR: CPU fan not detected. Press F1 to Resume.", "Primary IDE master failure. Press F1 to continue.", "CMOS Battery State Low. Press F1 to continue.", "Alert! Previous Reboot was due to voltage regulator failure", ] SMART_ERRORS = [ "SMART Failure Predicted on Hard Disk 0: WDC WD10EZEX-21WN4A0", "SMART Failure Predicted on Hard Disk 1: Seagate ST1000LM024", "WARNING: SMART Self-test log: FAILED read element", "Current Pending Sector Count: 208 (WARNING)", "Reallocated Sector Count: 1624 (CRITICAL)", "SMART error: Raw Read Error Rate exceeds threshold", "Uncorrectable Sector Count: 48 (FAILED)", "NTFS FILE SYSTEM: Volume C: is corrupt and unreadable", "Disk read error occurred. Press Ctrl+Alt+Del to restart.", "BAD_SYSTEM_CONFIG_INFO: Hard disk error detected", ] def generate_bsod_images(output_dir, n_samples=200): """Generate realistic BSOD images (Win10/11/7/XP styles).""" print("\n" + "="*60) print("PART 4a: Synthetic BSOD Images") print("="*60) img_out = output_dir / "synth_images" / "system_crash" img_out.mkdir(parents=True, exist_ok=True) for i in range(n_samples): win_version = random.choice([10, 11, 7, "xp"]) code, msg = random.choice(BSOD_ERRORS) if win_version in [10, 11]: img = _generate_win10_bsod(code, msg, win_version) elif win_version == 7: img = _generate_win7_bsod(code, msg) else: img = _generate_winxp_bsod(code, msg) img = img.resize(IMAGE_SIZE, Image.LANCZOS) if random.random() < 0.3: arr = np.array(img) noise = np.random.randint(-5, 5, arr.shape, dtype=np.int16) arr = np.clip(arr.astype(np.int16) + noise, 0, 255).astype(np.uint8) img = Image.fromarray(arr) img.save(str(img_out / f"bsod_{i:04d}.jpg"), quality=random.randint(75, 95)) print(f" ✓ Generated {n_samples} BSOD images") def _generate_win10_bsod(code, msg, version=10): w, h = 1920, 1080 color = "#0078D7" if version == 10 else "#000078" img = Image.new("RGB", (w, h), color=color) draw = ImageDraw.Draw(img) draw.text((int(w*0.05), int(h*0.08)), ":(", fill="white") y = int(h * 0.25) draw.text((int(w*0.05), y), "Your PC ran into a problem and needs to restart.", fill="white") draw.text((int(w*0.05), y+40), "We're just collecting some error info, and then we'll", fill="white") draw.text((int(w*0.05), y+70), "restart for you.", fill="white") pct = random.randint(0, 100) draw.text((int(w*0.05), y+130), f"{pct}% complete", fill="white") qr_x, qr_y = int(w*0.05), int(h*0.6) draw.rectangle([qr_x, qr_y, qr_x+100, qr_y+100], fill="white") for _ in range(50): bx, by = random.randint(qr_x+5, qr_x+95), random.randint(qr_y+5, qr_y+95) bs = random.randint(3, 8) draw.rectangle([bx, by, bx+bs, by+bs], fill="black") draw.text((int(w*0.15), int(h*0.72)), f"If you call a support person, give them this info:", fill="white") draw.text((int(w*0.15), int(h*0.76)), f"Stop code: {msg}", fill="white") return img def _generate_win7_bsod(code, msg): w, h = 1920, 1080 img = Image.new("RGB", (w, h), color="#000080") draw = ImageDraw.Draw(img) lines = [ "A problem has been detected and Windows has been shut down to prevent", "damage to your computer.", "", f"*** STOP: {code} ({msg})", "", "If this is the first time you've seen this error screen,", "restart your computer. If this screen appears again, follow these steps:", "", "Check to make sure any new hardware or software is properly installed.", "If this is a new installation, ask your hardware or software manufacturer", "for any Windows updates you might need.", "", f"Technical information:", f"*** STOP: {code} (0x00000001, 0x00000002, 0x00000000, 0x00000000)", ] y = 40 for line in lines: draw.text((40, y), line, fill="white"); y += 22 return img def _generate_winxp_bsod(code, msg): w, h = 1024, 768 img = Image.new("RGB", (w, h), color="#000080") draw = ImageDraw.Draw(img) lines = [ "A problem has been detected and Windows has been shut down to prevent", "damage to your computer.", "", f"STOP: {code} {msg}", "", "Beginning dump of physical memory.", "Physical memory dump complete.", "Contact your system administrator or technical support group.", ] y = 30 for line in lines: draw.text((20, y), line, fill="white"); y += 20 return img def generate_post_screens(output_dir, n_samples=200): """Generate BIOS POST failure screen images.""" print("\n" + "="*60) print("PART 4b: Synthetic BIOS POST Screens") print("="*60) img_out = output_dir / "synth_images" / "boot_failure" img_out.mkdir(parents=True, exist_ok=True) BIOS_VENDORS = [ "American Megatrends Inc. AMIBIOS (C)2024", "Award Modular BIOS v6.00PG", "Phoenix - AwardBIOS v6.00PG", "InsydeH2O Version 05.24.03.0007", ] CPUS = [ "Intel(R) Core(TM) i7-12700K @ 3.60GHz", "AMD Ryzen 7 5800X 8-Core Processor", "Intel(R) Core(TM) i9-14900K @ 3.20GHz", "AMD Ryzen 9 7950X 16-Core Processor", ] for i in range(n_samples): w, h = random.choice([(1920, 1080), (1024, 768)]) img = Image.new("RGB", (w, h), color="#000000") draw = ImageDraw.Draw(img) y = 20 draw.text((20, y), random.choice(BIOS_VENDORS), fill="#AAAAAA"); y += 25 draw.text((20, y), f"CPU: {random.choice(CPUS)}", fill="#AAFFAA"); y += 20 ram = random.choice([4096, 8192, 16384, 32768]) draw.text((20, y), f"Memory Test: {ram}MB", fill="#AAFFAA"); y += 20 checks = [("IDE Primary Master: WDC WD10EZEX", True), ("SATA 0: Samsung SSD 870 EVO", True)] for text, passed in checks: if random.random() < 0.8: draw.text((20, y), text, fill="#AAFFAA" if passed else "#FF4444"); y += 20 y += 20 draw.text((20, y), random.choice(POST_ERRORS), fill="#FF0000"); y += 25 draw.text((20, y), "Press F1 to Resume, F2 to enter SETUP", fill="#FFFFFF") img = img.resize(IMAGE_SIZE, Image.LANCZOS) img.save(str(img_out / f"post_{i:04d}.jpg"), quality=random.randint(80, 95)) print(f" ✓ Generated {n_samples} POST failure screen images") def generate_thermal_images(output_dir, n_samples=200): """Generate thermal warning images (HWMonitor, BIOS, Task Manager, popup styles).""" print("\n" + "="*60) print("PART 4c: Synthetic Thermal Warning Images") print("="*60) img_out = output_dir / "synth_images" / "overheating_fan" img_out.mkdir(parents=True, exist_ok=True) for i in range(n_samples): style = random.choice(["hwmonitor", "bios_warning", "popup"]) w, h = 1920, 1080 if style == "hwmonitor": img = Image.new("RGB", (w, h), color="#F0F0F0") draw = ImageDraw.Draw(img) draw.rectangle([0, 0, w, 40], fill="#0078D4") draw.text((10, 10), "HW Monitor - Temperature Critical!", fill="white") y = 60 for name, threshold in [("CPU Package", 85), ("GPU Core", 90), ("CPU VRM", 80)]: temp = random.randint(threshold, threshold + 20) color = "#FF0000" draw.text((20, y), f"{name}: {temp}°C [CRITICAL] ⚠ THERMAL THROTTLING", fill=color) y += 35 elif style == "bios_warning": img = Image.new("RGB", (w, h), color="#000000") draw = ImageDraw.Draw(img) temp = random.randint(95, 110) draw.text((w//4, h//4), "*** WARNING ***", fill="#FF0000") draw.text((w//4, h//4+40), f"CPU temperature: {temp}°C", fill="#FF0000") draw.text((w//4, h//4+80), "System will shut down in 10 seconds.", fill="#FFAA00") else: img = Image.new("RGB", (w, h), color="#1E1E1E") draw = ImageDraw.Draw(img) pw, ph = 400, 150 px, py = w-pw-20, h-ph-60 draw.rectangle([px, py, px+pw, py+ph], fill="#2D2D2D", outline="#FF6600", width=2) temp = random.randint(90, 105) draw.text((px+15, py+10), "⚠ Critical Temperature Warning", fill="#FF6600") draw.text((px+15, py+40), f"CPU temperature: {temp}°C", fill="#FF0000") draw.text((px+15, py+65), "Thermal throttling is active.", fill="#FFAA00") img = img.resize(IMAGE_SIZE, Image.LANCZOS) img.save(str(img_out / f"thermal_{i:04d}.jpg"), quality=random.randint(80, 95)) print(f" ✓ Generated {n_samples} thermal warning images") def generate_storage_error_images(output_dir, n_samples=200): """Generate disk/storage error screen images.""" print("\n" + "="*60) print("PART 4d: Synthetic Storage Error Images") print("="*60) img_out = output_dir / "synth_images" / "storage_failure" img_out.mkdir(parents=True, exist_ok=True) for i in range(n_samples): style = random.choice(["smart_warning", "disk_error", "chkdsk", "crystaldisk"]) w, h = random.choice([(1920, 1080), (1280, 1024)]) if style == "smart_warning": img = Image.new("RGB", (w, h), color="#000000") draw = ImageDraw.Draw(img) draw.text((w//6, h//3), "WARNING:", fill="#FFAA00") draw.text((w//6, h//3+40), random.choice(SMART_ERRORS), fill="#FF0000") draw.text((w//6, h//3+80), "Immediately back up your data.", fill="#FFFFFF") elif style == "disk_error": img = Image.new("RGB", (w, h), color="#000000") draw = ImageDraw.Draw(img) draw.text((20, h//3), "A disk read error occurred", fill="#CCCCCC") draw.text((20, h//3+30), "Press Ctrl+Alt+Del to restart", fill="#AAAAAA") elif style == "chkdsk": img = Image.new("RGB", (w, h), color="#000000") draw = ImageDraw.Draw(img) pct = random.randint(5, 95) draw.text((20, 20), "Checking file system on C:", fill="#CCCCCC") draw.text((20, 50), "The type of the file system is NTFS.", fill="#CCCCCC") draw.text((20, 100), f"CHKDSK is verifying files (stage {random.randint(1,5)} of 5)... {pct}%", fill="#FFFFFF") draw.text((20, 140), f"Windows found {random.randint(1,500)} bad sectors.", fill="#FF4444") else: img = Image.new("RGB", (w, h), color="#FFFFFF") draw = ImageDraw.Draw(img) draw.rectangle([0, 0, w, 40], fill="#3366CC") draw.text((10, 10), "CrystalDiskInfo - Health Status: CAUTION", fill="white") y = 60 health = random.choice(["Caution", "Bad"]) draw.text((20, y), f"Health Status: {health}", fill="#FF0000" if health=="Bad" else "#FFA500") y += 30 draw.text((20, y), f"Reallocated Sectors: {random.randint(50,2000)}", fill="#FF0000") img = img.resize(IMAGE_SIZE, Image.LANCZOS) img.save(str(img_out / f"storage_{i:04d}.jpg"), quality=random.randint(80, 95)) print(f" ✓ Generated {n_samples} storage error images") def generate_normal_images(output_dir, n_samples=200): """Generate normal operation desktop screenshots.""" print("\n" + "="*60) print("PART 4e: Synthetic Normal Desktop Images") print("="*60) img_out = output_dir / "synth_images" / "normal_operation" img_out.mkdir(parents=True, exist_ok=True) COLORS = ["#0078D4", "#1B5E20", "#283593", "#1A237E", "#004D40", "#311B92", "#880E4F", "#BF360C", "#006064", "#263238"] for i in range(n_samples): w, h = 1920, 1080 img = Image.new("RGB", (w, h), color=random.choice(COLORS)) draw = ImageDraw.Draw(img) # Gradient for y_pos in range(h): darken = int(40 * y_pos / h) r, g, b = img.getpixel((0, y_pos)) draw.line([(0, y_pos), (w, y_pos)], fill=(max(0,r-darken), max(0,g-darken), max(0,b-darken))) # Taskbar draw.rectangle([0, h-48, w, h], fill="#1F1F1F") draw.text((w-100, h-35), f"{random.randint(1,12):02d}:{random.randint(0,59):02d} PM", fill="#FFFFFF") # Maybe window if random.random() < 0.4: wx, wy = random.randint(100, w//2), random.randint(50, h//3) ww, wh = random.randint(400, 800), random.randint(300, 500) draw.rectangle([wx, wy, wx+ww, wy+wh], fill="#FFFFFF", outline="#CCCCCC") draw.rectangle([wx, wy, wx+ww, wy+30], fill="#F0F0F0") img = img.resize(IMAGE_SIZE, Image.LANCZOS) img.save(str(img_out / f"normal_{i:04d}.jpg"), quality=random.randint(80, 95)) print(f" ✓ Generated {n_samples} normal desktop images") # ============================================================================ # PART 5: Combine Everything into HF Dataset # ============================================================================ def build_final_dataset(build_dir, max_per_class=None): """Combine all sources into a unified dataset with paired audio+image samples.""" print("\n" + "="*60) print("PART 5: Building Final Dataset") print("="*60) audio_by_class = defaultdict(list) for audio_dir in [build_dir / "audio", build_dir / "hf_audio", build_dir / "synth_audio"]: if not audio_dir.exists(): continue for f in audio_dir.rglob("*.wav"): fname = f.stem.lower() for class_name in FAULT_NAME_TO_ID: if fname.startswith(class_name): audio_by_class[class_name].append(str(f)) break yt_audio_dir = build_dir / "youtube" / "audio" if yt_audio_dir.exists(): for class_dir in yt_audio_dir.iterdir(): if class_dir.is_dir() and class_dir.name in FAULT_NAME_TO_ID: for f in class_dir.glob("*.wav"): audio_by_class[class_dir.name].append(str(f)) image_by_class = defaultdict(list) for img_dir in [build_dir / "synth_images"]: if not img_dir.exists(): continue for class_dir in img_dir.iterdir(): if class_dir.is_dir() and class_dir.name in FAULT_NAME_TO_ID: for f in class_dir.glob("*.jpg"): image_by_class[class_dir.name].append(str(f)) for f in class_dir.glob("*.png"): image_by_class[class_dir.name].append(str(f)) yt_frames_dir = build_dir / "youtube" / "frames" if yt_frames_dir.exists(): for class_dir in yt_frames_dir.iterdir(): if class_dir.is_dir() and class_dir.name in FAULT_NAME_TO_ID: for f in class_dir.glob("*.jpg"): image_by_class[class_dir.name].append(str(f)) print("\n Audio files per class:") for cls in FAULT_CLASSES.values(): print(f" {cls}: {len(audio_by_class[cls])}") print("\n Image files per class:") for cls in FAULT_CLASSES.values(): print(f" {cls}: {len(image_by_class[cls])}") all_samples = [] for class_name, class_id in FAULT_NAME_TO_ID.items(): audios = audio_by_class[class_name] images = image_by_class[class_name] if not audios and not images: print(f" ⚠ No data for {class_name}, skipping") continue n_pairs = max(len(audios), len(images)) if max_per_class: n_pairs = min(n_pairs, max_per_class) for i in range(n_pairs): sample = {"fault_class": class_id, "fault_name": class_name} sample["audio_path"] = audios[i % len(audios)] if audios else None sample["image_path"] = images[i % len(images)] if images else None all_samples.append(sample) random.shuffle(all_samples) print(f"\n Total paired samples: {len(all_samples)}") class_dist = Counter(s["fault_name"] for s in all_samples) for cls, count in sorted(class_dist.items()): print(f" {cls}: {count}") manifest_path = build_dir / "dataset_manifest.json" with open(manifest_path, "w") as f: json.dump({"total_samples": len(all_samples), "class_distribution": dict(class_dist), "samples": all_samples}, f, indent=2) print(f"\n ✓ Manifest saved to {manifest_path}") return all_samples def upload_to_hub(build_dir, repo_id="Ellaft/pc-fault-real-dataset"): """Upload the built dataset to HuggingFace Hub.""" print("\n" + "="*60) print(f"Uploading to {repo_id}") print("="*60) from datasets import Dataset, Audio, DatasetDict from datasets import Image as HFImage manifest_path = build_dir / "dataset_manifest.json" with open(manifest_path) as f: manifest = json.load(f) data = {"audio": [], "image": [], "fault_class": [], "fault_name": [], "source": []} for s in manifest["samples"]: data["fault_class"].append(s["fault_class"]) data["fault_name"].append(s["fault_name"]) audio_path = s.get("audio_path") if audio_path: src = "youtube" if "youtube" in audio_path else ("huggingface" if "hf_audio" in audio_path else "synthetic") data["source"].append(src) data["audio"].append(audio_path) else: data["source"].append("synthetic") data["audio"].append(None) data["image"].append(s.get("image_path")) ds = Dataset.from_dict(data) ds = ds.cast_column("audio", Audio(sampling_rate=16000)) ds = ds.cast_column("image", HFImage()) ds = ds.train_test_split(test_size=0.3, seed=42, stratify_by_column="fault_class") test_val = ds["test"].train_test_split(test_size=0.5, seed=42, stratify_by_column="fault_class") final_ds = DatasetDict({"train": ds["train"], "validation": test_val["train"], "test": test_val["test"]}) print(f" Train: {len(final_ds['train'])}, Val: {len(final_ds['validation'])}, Test: {len(final_ds['test'])}") final_ds.push_to_hub(repo_id, private=False) print(f" ✓ Uploaded to https://huggingface.co/datasets/{repo_id}") # ============================================================================ # Main # ============================================================================ def main(): parser = argparse.ArgumentParser(description="PC Fault Detection Dataset Builder") parser.add_argument("--output_dir", default="./dataset_build") parser.add_argument("--skip_youtube", action="store_true", help="Skip YouTube scraping") parser.add_argument("--youtube_only", action="store_true", help="Only do YouTube scraping") parser.add_argument("--skip_hf", action="store_true", help="Skip HuggingFace dataset downloads") parser.add_argument("--skip_synth_audio", action="store_true") parser.add_argument("--skip_synth_images", action="store_true") parser.add_argument("--max_per_class", type=int, default=300) parser.add_argument("--max_yt_videos", type=int, default=3, help="Max YouTube videos per search query") parser.add_argument("--upload", action="store_true", help="Upload to HuggingFace Hub") parser.add_argument("--hub_repo", default="Ellaft/pc-fault-real-dataset") args = parser.parse_args() output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) print("="*60) print("PC Fault Detection — Dataset Builder") print("="*60) print(f"Output: {output_dir}") print(f"Max per class: {args.max_per_class}") if not args.skip_youtube: run_youtube_scraper(output_dir / "youtube", max_videos_per_query=args.max_yt_videos) if args.youtube_only: print("\n✓ YouTube scraping complete.") return if not args.skip_hf: download_cooling_fans(output_dir, max_per_class=args.max_per_class) try: download_fsd50k_relevant(output_dir, max_per_class=args.max_per_class // 3) except Exception as e: print(f" ⚠ FSD50K skipped: {e}") if not args.skip_synth_audio: generate_beep_codes(output_dir, n_per_pattern=args.max_per_class // 12) generate_crash_audio(output_dir, n_samples=args.max_per_class) generate_hdd_click_audio(output_dir, n_samples=args.max_per_class) if not args.skip_synth_images: generate_bsod_images(output_dir, n_samples=args.max_per_class) generate_post_screens(output_dir, n_samples=args.max_per_class) generate_thermal_images(output_dir, n_samples=args.max_per_class) generate_storage_error_images(output_dir, n_samples=args.max_per_class) generate_normal_images(output_dir, n_samples=args.max_per_class) build_final_dataset(output_dir, max_per_class=args.max_per_class) if args.upload: upload_to_hub(output_dir, repo_id=args.hub_repo) print("\n" + "="*60) print("✓ Dataset build complete!") print(f" Manifest: {output_dir}/dataset_manifest.json") print(f" To upload: python build_dataset.py --upload") print("="*60) if __name__ == "__main__": main()