| """ |
| PC Fault Detection — Complete Dataset Builder |
| =============================================== |
| Builds a real multimodal dataset from 4 sources: |
| 1. YouTube scraping (audio + video frames) |
| 2. HuggingFace datasets (cooling-fans, FSD50K) |
| 3. Synthetic audio generation (beep codes, fan noise augmentation) |
| 4. Synthetic visual generation (BSOD, POST screens, SMART errors, thermal warnings) |
| |
| NO API KEYS NEEDED — everything uses public tools. |
| |
| Output: HuggingFace dataset at Ellaft/pc-fault-real-dataset |
| Columns: audio (Audio), image (Image), fault_class (int), fault_name (str), source (str) |
| |
| Usage: |
| python build_dataset.py # Build everything |
| python build_dataset.py --skip_youtube # Skip YouTube (slow), use synth+HF only |
| python build_dataset.py --youtube_only # Only scrape YouTube |
| python build_dataset.py --upload # Upload to HuggingFace Hub |
| python build_dataset.py --max_per_class 200 # Limit samples per class |
| |
| Requirements: |
| pip install -r requirements_data.txt |
| # Also needs ffmpeg for video frame extraction: |
| # Ubuntu: sudo apt install ffmpeg |
| # Mac: brew install ffmpeg |
| # Or: pip install imageio-ffmpeg (Python-only fallback) |
| """ |
|
|
| import os, sys, json, random, glob, shutil, argparse, subprocess, re |
| import numpy as np |
| from pathlib import Path |
| from PIL import Image, ImageDraw, ImageFont |
| from collections import Counter, defaultdict |
|
|
| |
| |
| |
| FAULT_CLASSES = { |
| 0: "normal_operation", |
| 1: "boot_failure", |
| 2: "overheating_fan", |
| 3: "storage_failure", |
| 4: "system_crash", |
| } |
| FAULT_NAME_TO_ID = {v: k for k, v in FAULT_CLASSES.items()} |
|
|
| DATA_ROOT = Path("./dataset_build") |
| AUDIO_DIR = DATA_ROOT / "audio" |
| IMAGE_DIR = DATA_ROOT / "image" |
| FINAL_DIR = DATA_ROOT / "final" |
|
|
| SAMPLE_RATE = 16000 |
| AUDIO_DURATION = 5.0 |
| IMAGE_SIZE = (224, 224) |
|
|
|
|
| |
| |
| |
|
|
| |
| YOUTUBE_QUERIES = { |
| "normal_operation": [ |
| "quiet gaming PC idle fan noise ambient", |
| "silent PC build running quiet computer sound", |
| "computer fan white noise sleep 1 hour", |
| "desktop PC running normally ambient sound", |
| ], |
| "boot_failure": [ |
| "BIOS beep codes explained AMI Award", |
| "computer beep codes troubleshooting POST", |
| "motherboard beep code 3 long beeps", |
| "PC won't boot beeping sound BIOS", |
| "UEFI boot failure no display beep", |
| ], |
| "overheating_fan": [ |
| "loud PC fan noise grinding bearing failure", |
| "CPU fan rattling noise overheating computer", |
| "graphics card fan loud grinding noise", |
| "laptop fan very loud overheating spinning", |
| "PC fan bearing failure wobble noise repair", |
| ], |
| "storage_failure": [ |
| "hard drive clicking noise dying HDD sound", |
| "HDD click of death failing hard drive", |
| "hard drive failure sounds different brands", |
| "SSD failure symptoms clicking grinding noise", |
| "hard disk bad sector read error sound", |
| ], |
| "system_crash": [ |
| "Windows blue screen of death BSOD live", |
| "computer crash freeze blue screen error", |
| "Windows 10 11 BSOD crash stop code", |
| "kernel panic Linux system crash", |
| "PC randomly crashes blue screen gaming", |
| ], |
| } |
|
|
|
|
| def _find_ffmpeg(): |
| """Find ffmpeg binary — system or Python package.""" |
| |
| try: |
| result = subprocess.run(["ffmpeg", "-version"], capture_output=True, timeout=5) |
| if result.returncode == 0: |
| return "ffmpeg" |
| except (FileNotFoundError, subprocess.TimeoutExpired): |
| pass |
| |
| try: |
| import imageio_ffmpeg |
| return imageio_ffmpeg.get_ffmpeg_exe() |
| except ImportError: |
| pass |
| return None |
|
|
|
|
| def scrape_youtube(fault_class, query, output_dir, max_videos=5, max_duration=120): |
| """ |
| Scrape YouTube: download audio as WAV + extract video frames. |
| Uses yt-dlp (no API key needed, no login needed). |
| """ |
| audio_out = output_dir / "audio" / fault_class |
| frames_out = output_dir / "frames" / fault_class |
| audio_out.mkdir(parents=True, exist_ok=True) |
| frames_out.mkdir(parents=True, exist_ok=True) |
|
|
| print(f" Searching: '{query}' (max {max_videos} videos)...") |
|
|
| |
| audio_cmd = [ |
| "yt-dlp", |
| f"ytsearch{max_videos}:{query}", |
| "--extract-audio", |
| "--audio-format", "wav", |
| "--audio-quality", "0", |
| "--match-filter", f"duration<{max_duration}", |
| "--max-downloads", str(max_videos), |
| "--no-playlist", |
| "--quiet", |
| "--no-warnings", |
| "-o", str(audio_out / "%(id)s.%(ext)s"), |
| ] |
|
|
| |
| video_cmd = [ |
| "yt-dlp", |
| f"ytsearch{max_videos}:{query}", |
| "--format", "worst[ext=mp4]", |
| "--match-filter", f"duration<{max_duration}", |
| "--max-downloads", str(max_videos), |
| "--no-playlist", |
| "--quiet", |
| "--no-warnings", |
| "-o", str(frames_out / "%(id)s.%(ext)s"), |
| ] |
|
|
| try: |
| subprocess.run(audio_cmd, timeout=180, capture_output=True) |
| except (subprocess.TimeoutExpired, Exception) as e: |
| print(f" âš Audio download timeout/error: {e}") |
|
|
| try: |
| subprocess.run(video_cmd, timeout=180, capture_output=True) |
| except (subprocess.TimeoutExpired, Exception) as e: |
| print(f" âš Video download timeout/error: {e}") |
|
|
| |
| ffmpeg_bin = _find_ffmpeg() |
| if ffmpeg_bin: |
| video_files = list(frames_out.glob("*.mp4")) + list(frames_out.glob("*.webm")) |
| for vf in video_files: |
| frame_prefix = frames_out / f"{vf.stem}_frame" |
| ffmpeg_cmd = [ |
| ffmpeg_bin, "-i", str(vf), |
| "-vf", "fps=0.5,scale=224:224", |
| "-q:v", "2", |
| "-y", "-loglevel", "error", |
| str(frame_prefix) + "_%04d.jpg", |
| ] |
| try: |
| subprocess.run(ffmpeg_cmd, timeout=60, capture_output=True) |
| except Exception: |
| pass |
| vf.unlink(missing_ok=True) |
| else: |
| print(" âš ffmpeg not found, skipping frame extraction") |
| print(" Install: sudo apt install ffmpeg OR pip install imageio-ffmpeg") |
|
|
| n_audio = len(list(audio_out.glob("*.wav"))) |
| n_frames = len(list(frames_out.glob("*.jpg"))) |
| print(f" ✓ {n_audio} audio files, {n_frames} frames") |
| return n_audio, n_frames |
|
|
|
|
| def run_youtube_scraper(output_dir, max_videos_per_query=3): |
| """Scrape YouTube for all fault classes.""" |
| print("\n" + "="*60) |
| print("PART 1: YouTube Scraping") |
| print("="*60) |
| print("(No API key needed — uses yt-dlp public scraping)") |
| print("(If blocked, run on your local machine or Google Colab)") |
|
|
| stats = {} |
| for fault_class, queries in YOUTUBE_QUERIES.items(): |
| print(f"\n[{fault_class}]") |
| total_audio, total_frames = 0, 0 |
| for q in queries: |
| na, nf = scrape_youtube( |
| fault_class, q, output_dir, |
| max_videos=max_videos_per_query, max_duration=120) |
| total_audio += na |
| total_frames += nf |
| stats[fault_class] = {"audio": total_audio, "frames": total_frames} |
| print(f" Total: {total_audio} audio, {total_frames} frames") |
|
|
| return stats |
|
|
|
|
| |
| |
| |
|
|
| def download_cooling_fans(output_dir, max_per_class=200): |
| """ |
| Download HenriqueFrancaa/cooling-fans-db0 — real PC cooling fan recordings. |
| Normal → normal_operation, Abnormal → overheating_fan |
| """ |
| print("\n" + "="*60) |
| print("PART 2a: Cooling Fans Dataset (HuggingFace)") |
| print("="*60) |
|
|
| from datasets import load_dataset |
| import soundfile as sf |
|
|
| audio_out = output_dir / "hf_audio" |
| audio_out.mkdir(parents=True, exist_ok=True) |
|
|
| try: |
| ds = load_dataset("HenriqueFrancaa/cooling-fans-db0", split="train") |
| print(f" Loaded {len(ds)} samples") |
| except Exception as e: |
| print(f" âš Failed to load cooling-fans: {e}") |
| try: |
| from huggingface_hub import snapshot_download |
| path = snapshot_download("HenriqueFrancaa/cooling-fans-db0", repo_type="dataset") |
| wav_files = glob.glob(os.path.join(path, "**/*.wav"), recursive=True) |
| print(f" Found {len(wav_files)} WAV files via snapshot") |
|
|
| normal_count, abnormal_count = 0, 0 |
| for wf in wav_files: |
| path_lower = wf.lower() |
| if "abnormal" in path_lower and abnormal_count < max_per_class: |
| out_path = audio_out / f"overheating_fan_cfan_{abnormal_count:04d}.wav" |
| shutil.copy2(wf, out_path) |
| abnormal_count += 1 |
| elif "normal" in path_lower and normal_count < max_per_class: |
| out_path = audio_out / f"normal_operation_cfan_{normal_count:04d}.wav" |
| shutil.copy2(wf, out_path) |
| normal_count += 1 |
| print(f" ✓ Saved {normal_count} normal, {abnormal_count} abnormal fan clips") |
| return {"normal_operation": normal_count, "overheating_fan": abnormal_count} |
| except Exception as e2: |
| print(f" ✗ Snapshot also failed: {e2}") |
| return {} |
|
|
| normal_count, abnormal_count = 0, 0 |
| for i, sample in enumerate(ds): |
| audio = sample["audio"] |
| arr = np.array(audio["array"], dtype=np.float32) |
| sr = audio["sampling_rate"] |
|
|
| if i % 2 == 0 and normal_count < max_per_class: |
| out_path = audio_out / f"normal_operation_cfan_{normal_count:04d}.wav" |
| sf.write(str(out_path), arr, sr) |
| normal_count += 1 |
| elif abnormal_count < max_per_class: |
| out_path = audio_out / f"overheating_fan_cfan_{abnormal_count:04d}.wav" |
| sf.write(str(out_path), arr, sr) |
| abnormal_count += 1 |
|
|
| if normal_count >= max_per_class and abnormal_count >= max_per_class: |
| break |
|
|
| print(f" ✓ Saved {normal_count} normal, {abnormal_count} abnormal fan clips") |
| return {"normal_operation": normal_count, "overheating_fan": abnormal_count} |
|
|
|
|
| def download_fsd50k_relevant(output_dir, max_per_class=100): |
| """Download relevant sound events from FSD50K.""" |
| print("\n" + "="*60) |
| print("PART 2b: FSD50K Sound Events (HuggingFace)") |
| print("="*60) |
|
|
| from datasets import load_dataset |
| import soundfile as sf |
|
|
| audio_out = output_dir / "hf_audio" |
| audio_out.mkdir(parents=True, exist_ok=True) |
|
|
| LABEL_MAP = { |
| "overheating_fan": ["fan", "mechanical_fan", "whir", "buzz", "hum", "engine"], |
| "boot_failure": ["beep", "bleep", "alarm", "buzzer", "siren"], |
| "storage_failure": ["click", "tick", "ratchet", "mechanical"], |
| "normal_operation": ["keyboard", "typing", "mouse", "computer_keyboard"], |
| } |
|
|
| try: |
| ds = load_dataset("Fhrozen/FSD50k", split="validation") |
| print(f" Loaded {len(ds)} samples from FSD50K validation split") |
| except Exception as e: |
| print(f" âš Failed to load FSD50K: {e}") |
| return {} |
|
|
| counts = defaultdict(int) |
| for sample in ds: |
| label = str(sample.get("label", "")).lower() |
| if not label: |
| continue |
|
|
| for fault_class, keywords in LABEL_MAP.items(): |
| if any(kw in label for kw in keywords) and counts[fault_class] < max_per_class: |
| audio = sample["audio"] |
| arr = np.array(audio["array"], dtype=np.float32) |
| sr = audio["sampling_rate"] |
| idx = counts[fault_class] |
| out_path = audio_out / f"{fault_class}_fsd_{idx:04d}.wav" |
| sf.write(str(out_path), arr, sr) |
| counts[fault_class] += 1 |
| break |
|
|
| for fc, c in counts.items(): |
| print(f" ✓ {fc}: {c} clips from FSD50K") |
| return dict(counts) |
|
|
|
|
| |
| |
| |
|
|
| def _synthesize_beep_pattern(pattern, freq=1000, jitter=0.1): |
| """Synthesize a beep code pattern with timing jitter.""" |
| audio = [] |
| for duration_ms, is_beep in pattern: |
| actual_duration = duration_ms * (1 + random.uniform(-jitter, jitter)) |
| n_samples = int(SAMPLE_RATE * actual_duration / 1000) |
|
|
| if is_beep: |
| t = np.linspace(0, actual_duration / 1000, n_samples) |
| beep = 0.5 * np.sign(np.sin(2 * np.pi * freq * t)) |
| beep = 0.7 * beep + 0.3 * (0.5 * np.sin(2 * np.pi * freq * t)) |
| envelope = np.ones(n_samples) |
| attack = min(200, n_samples // 4) |
| envelope[:attack] = np.linspace(0, 1, attack) |
| envelope[-attack:] = np.linspace(1, 0, attack) |
| beep = beep * envelope |
| audio.append(beep) |
| else: |
| audio.append(np.zeros(n_samples)) |
|
|
| result = np.concatenate(audio) |
| target_len = int(SAMPLE_RATE * AUDIO_DURATION) |
| if len(result) < target_len: |
| repeats = target_len // len(result) + 1 |
| result = np.tile(result, repeats)[:target_len] |
| else: |
| result = result[:target_len] |
|
|
| return result.astype(np.float32) |
|
|
|
|
| def generate_beep_codes(output_dir, n_per_pattern=30): |
| """Generate BIOS POST beep code audio with real AMI/Award/Phoenix patterns.""" |
| print("\n" + "="*60) |
| print("PART 3a: Synthetic BIOS Beep Codes") |
| print("="*60) |
|
|
| import scipy.io.wavfile as wav |
|
|
| audio_out = output_dir / "synth_audio" |
| audio_out.mkdir(parents=True, exist_ok=True) |
|
|
| BEEP_PATTERNS = { |
| "ami_dram_refresh": [(800, True), (200, False), (200, True), (200, False)], |
| "ami_memory_parity": [(200, True), (100, False)] * 3, |
| "ami_base_memory": [(200, True), (100, False)] * 4, |
| "ami_timer_failure": [(200, True), (100, False)] * 5, |
| "ami_cpu_failure": [(200, True), (100, False)] * 6, |
| "ami_keyboard_ctrl": [(200, True), (100, False)] * 7, |
| "ami_video_failure": [(800, True), (200, False), (800, True), (200, False), (200, True)], |
| "award_general_failure": [(800, True), (300, False)] * 1, |
| "award_video_error": [(800, True), (200, False), (200, True), (200, False), (200, True)], |
| "award_no_video": [(800, True), (200, False)] * 2, |
| "phoenix_cpu_error": [(200, True), (300, False), (200, True), (300, False), (200, True), (500, False), (200, True), (300, False), (200, True)], |
| "phoenix_memory_error": [(200, True), (300, False), (200, True), (500, False), (200, True), (300, False), (200, True), (300, False), (200, True)], |
| } |
|
|
| NORMAL_BEEP = [(200, True), (300, False)] |
|
|
| count = 0 |
| for pattern_name, pattern in BEEP_PATTERNS.items(): |
| for i in range(n_per_pattern): |
| freq = random.uniform(800, 1200) |
| audio = _synthesize_beep_pattern(pattern, freq, jitter=0.15) |
| noise = np.random.randn(len(audio)) * random.uniform(0.005, 0.02) |
| audio = np.clip(audio + noise, -1, 1) |
| out_path = audio_out / f"boot_failure_beep_{count:04d}.wav" |
| wav.write(str(out_path), SAMPLE_RATE, (audio * 32767).astype(np.int16)) |
| count += 1 |
|
|
| for i in range(n_per_pattern): |
| freq = random.uniform(900, 1100) |
| audio = _synthesize_beep_pattern(NORMAL_BEEP, freq, jitter=0.1) |
| noise = np.random.randn(len(audio)) * 0.005 |
| audio = np.clip(audio + noise, -1, 1) |
| out_path = audio_out / f"normal_operation_beep_{i:04d}.wav" |
| wav.write(str(out_path), SAMPLE_RATE, (audio * 32767).astype(np.int16)) |
|
|
| print(f" ✓ Generated {count} boot_failure beep codes + {n_per_pattern} normal POST beeps") |
| return count |
|
|
|
|
| def generate_crash_audio(output_dir, n_samples=100): |
| """Generate system crash audio: noise bursts, glitches, hangs, feedback loops.""" |
| print("\n" + "="*60) |
| print("PART 3b: Synthetic System Crash Audio") |
| print("="*60) |
|
|
| import scipy.io.wavfile as wav |
|
|
| audio_out = output_dir / "synth_audio" |
| audio_out.mkdir(parents=True, exist_ok=True) |
|
|
| count = 0 |
| target_len = int(SAMPLE_RATE * AUDIO_DURATION) |
|
|
| for i in range(n_samples): |
| crash_type = random.choice(["noise_burst", "glitch", "hang", "feedback"]) |
|
|
| if crash_type == "noise_burst": |
| audio = np.zeros(target_len, dtype=np.float32) |
| burst_start = random.randint(0, target_len // 2) |
| burst_len = random.randint(SAMPLE_RATE // 4, SAMPLE_RATE * 2) |
| burst = np.random.randn(min(burst_len, target_len - burst_start)).astype(np.float32) |
| burst *= random.uniform(0.3, 0.8) |
| decay = np.exp(-np.linspace(0, 3, len(burst))) |
| audio[burst_start:burst_start + len(burst)] = burst * decay |
|
|
| elif crash_type == "glitch": |
| chunk_len = random.randint(50, 500) |
| chunk = np.random.randn(chunk_len).astype(np.float32) * 0.3 |
| audio = np.tile(chunk, target_len // chunk_len + 1)[:target_len] |
| for _ in range(random.randint(1, 5)): |
| start = random.randint(0, target_len - SAMPLE_RATE) |
| duration = random.randint(SAMPLE_RATE // 10, SAMPLE_RATE) |
| audio[start:start + duration] = 0 |
|
|
| elif crash_type == "hang": |
| audio = np.random.randn(target_len).astype(np.float32) * 0.01 |
| cutoff = random.randint(SAMPLE_RATE // 4, SAMPLE_RATE) |
| audio[:cutoff] = np.random.randn(cutoff) * 0.2 |
|
|
| else: |
| freq = random.uniform(200, 2000) |
| t = np.linspace(0, AUDIO_DURATION, target_len) |
| audio = 0.4 * np.sin(2 * np.pi * freq * t) |
| audio *= np.linspace(0.1, 1.0, target_len) |
| audio += 0.2 * np.sin(2 * np.pi * freq * 2 * t) |
| audio = audio.astype(np.float32) |
|
|
| audio = np.clip(audio, -1, 1) |
| out_path = audio_out / f"system_crash_synth_{count:04d}.wav" |
| wav.write(str(out_path), SAMPLE_RATE, (audio * 32767).astype(np.int16)) |
| count += 1 |
|
|
| print(f" ✓ Generated {count} system_crash audio samples") |
| return count |
|
|
|
|
| def generate_hdd_click_audio(output_dir, n_samples=100): |
| """Generate HDD clicking/grinding sounds for storage_failure.""" |
| print("\n" + "="*60) |
| print("PART 3c: Synthetic HDD Click Audio") |
| print("="*60) |
|
|
| import scipy.io.wavfile as wav |
|
|
| audio_out = output_dir / "synth_audio" |
| audio_out.mkdir(parents=True, exist_ok=True) |
|
|
| count = 0 |
| target_len = int(SAMPLE_RATE * AUDIO_DURATION) |
|
|
| for i in range(n_samples): |
| audio = np.zeros(target_len, dtype=np.float32) |
|
|
| |
| hum_freq = random.uniform(40, 80) |
| t = np.linspace(0, AUDIO_DURATION, target_len) |
| audio += random.uniform(0.02, 0.08) * np.sin(2 * np.pi * hum_freq * t) |
|
|
| |
| click_interval = random.uniform(0.3, 1.5) |
| n_clicks = int(AUDIO_DURATION / click_interval) |
| click_duration = int(SAMPLE_RATE * random.uniform(0.005, 0.02)) |
|
|
| for j in range(n_clicks): |
| pos = int(j * click_interval * SAMPLE_RATE) |
| pos += random.randint(-100, 100) |
| pos = max(0, min(pos, target_len - click_duration)) |
| click = np.random.randn(click_duration) * random.uniform(0.2, 0.6) |
| click *= np.exp(-np.linspace(0, 8, click_duration)) |
| audio[pos:pos + click_duration] += click.astype(np.float32) |
|
|
| |
| if random.random() < 0.3: |
| grind_start = random.randint(0, target_len // 2) |
| grind_len = random.randint(SAMPLE_RATE // 2, SAMPLE_RATE * 2) |
| grind_len = min(grind_len, target_len - grind_start) |
| grind = np.random.randn(grind_len) * 0.15 |
| grind_freq = random.uniform(500, 3000) |
| t_grind = np.linspace(0, grind_len / SAMPLE_RATE, grind_len) |
| grind *= (1 + 0.5 * np.sin(2 * np.pi * grind_freq * t_grind)) |
| audio[grind_start:grind_start + grind_len] += grind.astype(np.float32) |
|
|
| audio = np.clip(audio, -1, 1) |
| out_path = audio_out / f"storage_failure_hdd_{count:04d}.wav" |
| wav.write(str(out_path), SAMPLE_RATE, (audio * 32767).astype(np.int16)) |
| count += 1 |
|
|
| print(f" ✓ Generated {count} storage_failure HDD click audio samples") |
| return count |
|
|
|
|
| |
| |
| |
|
|
| BSOD_ERRORS = [ |
| ("0x0000007E", "SYSTEM_THREAD_EXCEPTION_NOT_HANDLED"), |
| ("0x0000003B", "SYSTEM_SERVICE_EXCEPTION"), |
| ("0x00000050", "PAGE_FAULT_IN_NONPAGED_AREA"), |
| ("0x0000001A", "MEMORY_MANAGEMENT"), |
| ("0x000000EF", "CRITICAL_PROCESS_DIED"), |
| ("0xC0000005", "KERNEL_SECURITY_CHECK_FAILURE"), |
| ("0x00000133", "DPC_WATCHDOG_VIOLATION"), |
| ("0x000000D1", "DRIVER_IRQL_NOT_LESS_OR_EQUAL"), |
| ("0x0000007F", "UNEXPECTED_KERNEL_MODE_TRAP"), |
| ("0x00000124", "WHEA_UNCORRECTABLE_ERROR"), |
| ("0x0000000A", "IRQL_NOT_LESS_OR_EQUAL"), |
| ("0x0000001E", "KMODE_EXCEPTION_NOT_HANDLED"), |
| ("0x000000C5", "DRIVER_CORRUPTED_EXPOOL"), |
| ("0x00000019", "BAD_POOL_HEADER"), |
| ("0x00000139", "KERNEL_SECURITY_CHECK_FAILURE"), |
| ] |
|
|
| POST_ERRORS = [ |
| "ERROR: Boot device not found. Press F1 to continue...", |
| "CMOS checksum error - Defaults loaded", |
| "Keyboard error or no keyboard present", |
| "NTLDR is missing. Press Ctrl+Alt+Del to restart", |
| "DISK BOOT FAILURE, INSERT SYSTEM DISK AND PRESS ENTER", |
| "Reboot and Select proper Boot device", |
| "No boot device available - strike F1 to retry boot", |
| "ERROR: CPU fan not detected. Press F1 to Resume.", |
| "Primary IDE master failure. Press F1 to continue.", |
| "CMOS Battery State Low. Press F1 to continue.", |
| "Alert! Previous Reboot was due to voltage regulator failure", |
| ] |
|
|
| SMART_ERRORS = [ |
| "SMART Failure Predicted on Hard Disk 0: WDC WD10EZEX-21WN4A0", |
| "SMART Failure Predicted on Hard Disk 1: Seagate ST1000LM024", |
| "WARNING: SMART Self-test log: FAILED read element", |
| "Current Pending Sector Count: 208 (WARNING)", |
| "Reallocated Sector Count: 1624 (CRITICAL)", |
| "SMART error: Raw Read Error Rate exceeds threshold", |
| "Uncorrectable Sector Count: 48 (FAILED)", |
| "NTFS FILE SYSTEM: Volume C: is corrupt and unreadable", |
| "Disk read error occurred. Press Ctrl+Alt+Del to restart.", |
| "BAD_SYSTEM_CONFIG_INFO: Hard disk error detected", |
| ] |
|
|
|
|
| def generate_bsod_images(output_dir, n_samples=200): |
| """Generate realistic BSOD images (Win10/11/7/XP styles).""" |
| print("\n" + "="*60) |
| print("PART 4a: Synthetic BSOD Images") |
| print("="*60) |
|
|
| img_out = output_dir / "synth_images" / "system_crash" |
| img_out.mkdir(parents=True, exist_ok=True) |
|
|
| for i in range(n_samples): |
| win_version = random.choice([10, 11, 7, "xp"]) |
| code, msg = random.choice(BSOD_ERRORS) |
|
|
| if win_version in [10, 11]: |
| img = _generate_win10_bsod(code, msg, win_version) |
| elif win_version == 7: |
| img = _generate_win7_bsod(code, msg) |
| else: |
| img = _generate_winxp_bsod(code, msg) |
|
|
| img = img.resize(IMAGE_SIZE, Image.LANCZOS) |
|
|
| if random.random() < 0.3: |
| arr = np.array(img) |
| noise = np.random.randint(-5, 5, arr.shape, dtype=np.int16) |
| arr = np.clip(arr.astype(np.int16) + noise, 0, 255).astype(np.uint8) |
| img = Image.fromarray(arr) |
|
|
| img.save(str(img_out / f"bsod_{i:04d}.jpg"), quality=random.randint(75, 95)) |
|
|
| print(f" ✓ Generated {n_samples} BSOD images") |
|
|
|
|
| def _generate_win10_bsod(code, msg, version=10): |
| w, h = 1920, 1080 |
| color = "#0078D7" if version == 10 else "#000078" |
| img = Image.new("RGB", (w, h), color=color) |
| draw = ImageDraw.Draw(img) |
| draw.text((int(w*0.05), int(h*0.08)), ":(", fill="white") |
| y = int(h * 0.25) |
| draw.text((int(w*0.05), y), "Your PC ran into a problem and needs to restart.", fill="white") |
| draw.text((int(w*0.05), y+40), "We're just collecting some error info, and then we'll", fill="white") |
| draw.text((int(w*0.05), y+70), "restart for you.", fill="white") |
| pct = random.randint(0, 100) |
| draw.text((int(w*0.05), y+130), f"{pct}% complete", fill="white") |
| qr_x, qr_y = int(w*0.05), int(h*0.6) |
| draw.rectangle([qr_x, qr_y, qr_x+100, qr_y+100], fill="white") |
| for _ in range(50): |
| bx, by = random.randint(qr_x+5, qr_x+95), random.randint(qr_y+5, qr_y+95) |
| bs = random.randint(3, 8) |
| draw.rectangle([bx, by, bx+bs, by+bs], fill="black") |
| draw.text((int(w*0.15), int(h*0.72)), f"If you call a support person, give them this info:", fill="white") |
| draw.text((int(w*0.15), int(h*0.76)), f"Stop code: {msg}", fill="white") |
| return img |
|
|
|
|
| def _generate_win7_bsod(code, msg): |
| w, h = 1920, 1080 |
| img = Image.new("RGB", (w, h), color="#000080") |
| draw = ImageDraw.Draw(img) |
| lines = [ |
| "A problem has been detected and Windows has been shut down to prevent", |
| "damage to your computer.", "", |
| f"*** STOP: {code} ({msg})", "", |
| "If this is the first time you've seen this error screen,", |
| "restart your computer. If this screen appears again, follow these steps:", "", |
| "Check to make sure any new hardware or software is properly installed.", |
| "If this is a new installation, ask your hardware or software manufacturer", |
| "for any Windows updates you might need.", "", |
| f"Technical information:", f"*** STOP: {code} (0x00000001, 0x00000002, 0x00000000, 0x00000000)", |
| ] |
| y = 40 |
| for line in lines: |
| draw.text((40, y), line, fill="white"); y += 22 |
| return img |
|
|
|
|
| def _generate_winxp_bsod(code, msg): |
| w, h = 1024, 768 |
| img = Image.new("RGB", (w, h), color="#000080") |
| draw = ImageDraw.Draw(img) |
| lines = [ |
| "A problem has been detected and Windows has been shut down to prevent", |
| "damage to your computer.", "", f"STOP: {code} {msg}", "", |
| "Beginning dump of physical memory.", "Physical memory dump complete.", |
| "Contact your system administrator or technical support group.", |
| ] |
| y = 30 |
| for line in lines: |
| draw.text((20, y), line, fill="white"); y += 20 |
| return img |
|
|
|
|
| def generate_post_screens(output_dir, n_samples=200): |
| """Generate BIOS POST failure screen images.""" |
| print("\n" + "="*60) |
| print("PART 4b: Synthetic BIOS POST Screens") |
| print("="*60) |
|
|
| img_out = output_dir / "synth_images" / "boot_failure" |
| img_out.mkdir(parents=True, exist_ok=True) |
|
|
| BIOS_VENDORS = [ |
| "American Megatrends Inc. AMIBIOS (C)2024", "Award Modular BIOS v6.00PG", |
| "Phoenix - AwardBIOS v6.00PG", "InsydeH2O Version 05.24.03.0007", |
| ] |
| CPUS = [ |
| "Intel(R) Core(TM) i7-12700K @ 3.60GHz", "AMD Ryzen 7 5800X 8-Core Processor", |
| "Intel(R) Core(TM) i9-14900K @ 3.20GHz", "AMD Ryzen 9 7950X 16-Core Processor", |
| ] |
|
|
| for i in range(n_samples): |
| w, h = random.choice([(1920, 1080), (1024, 768)]) |
| img = Image.new("RGB", (w, h), color="#000000") |
| draw = ImageDraw.Draw(img) |
|
|
| y = 20 |
| draw.text((20, y), random.choice(BIOS_VENDORS), fill="#AAAAAA"); y += 25 |
| draw.text((20, y), f"CPU: {random.choice(CPUS)}", fill="#AAFFAA"); y += 20 |
| ram = random.choice([4096, 8192, 16384, 32768]) |
| draw.text((20, y), f"Memory Test: {ram}MB", fill="#AAFFAA"); y += 20 |
|
|
| checks = [("IDE Primary Master: WDC WD10EZEX", True), ("SATA 0: Samsung SSD 870 EVO", True)] |
| for text, passed in checks: |
| if random.random() < 0.8: |
| draw.text((20, y), text, fill="#AAFFAA" if passed else "#FF4444"); y += 20 |
|
|
| y += 20 |
| draw.text((20, y), random.choice(POST_ERRORS), fill="#FF0000"); y += 25 |
| draw.text((20, y), "Press F1 to Resume, F2 to enter SETUP", fill="#FFFFFF") |
|
|
| img = img.resize(IMAGE_SIZE, Image.LANCZOS) |
| img.save(str(img_out / f"post_{i:04d}.jpg"), quality=random.randint(80, 95)) |
|
|
| print(f" ✓ Generated {n_samples} POST failure screen images") |
|
|
|
|
| def generate_thermal_images(output_dir, n_samples=200): |
| """Generate thermal warning images (HWMonitor, BIOS, Task Manager, popup styles).""" |
| print("\n" + "="*60) |
| print("PART 4c: Synthetic Thermal Warning Images") |
| print("="*60) |
|
|
| img_out = output_dir / "synth_images" / "overheating_fan" |
| img_out.mkdir(parents=True, exist_ok=True) |
|
|
| for i in range(n_samples): |
| style = random.choice(["hwmonitor", "bios_warning", "popup"]) |
| w, h = 1920, 1080 |
|
|
| if style == "hwmonitor": |
| img = Image.new("RGB", (w, h), color="#F0F0F0") |
| draw = ImageDraw.Draw(img) |
| draw.rectangle([0, 0, w, 40], fill="#0078D4") |
| draw.text((10, 10), "HW Monitor - Temperature Critical!", fill="white") |
| y = 60 |
| for name, threshold in [("CPU Package", 85), ("GPU Core", 90), ("CPU VRM", 80)]: |
| temp = random.randint(threshold, threshold + 20) |
| color = "#FF0000" |
| draw.text((20, y), f"{name}: {temp}°C [CRITICAL] ⚠THERMAL THROTTLING", fill=color) |
| y += 35 |
| elif style == "bios_warning": |
| img = Image.new("RGB", (w, h), color="#000000") |
| draw = ImageDraw.Draw(img) |
| temp = random.randint(95, 110) |
| draw.text((w//4, h//4), "*** WARNING ***", fill="#FF0000") |
| draw.text((w//4, h//4+40), f"CPU temperature: {temp}°C", fill="#FF0000") |
| draw.text((w//4, h//4+80), "System will shut down in 10 seconds.", fill="#FFAA00") |
| else: |
| img = Image.new("RGB", (w, h), color="#1E1E1E") |
| draw = ImageDraw.Draw(img) |
| pw, ph = 400, 150 |
| px, py = w-pw-20, h-ph-60 |
| draw.rectangle([px, py, px+pw, py+ph], fill="#2D2D2D", outline="#FF6600", width=2) |
| temp = random.randint(90, 105) |
| draw.text((px+15, py+10), "âš Critical Temperature Warning", fill="#FF6600") |
| draw.text((px+15, py+40), f"CPU temperature: {temp}°C", fill="#FF0000") |
| draw.text((px+15, py+65), "Thermal throttling is active.", fill="#FFAA00") |
|
|
| img = img.resize(IMAGE_SIZE, Image.LANCZOS) |
| img.save(str(img_out / f"thermal_{i:04d}.jpg"), quality=random.randint(80, 95)) |
|
|
| print(f" ✓ Generated {n_samples} thermal warning images") |
|
|
|
|
| def generate_storage_error_images(output_dir, n_samples=200): |
| """Generate disk/storage error screen images.""" |
| print("\n" + "="*60) |
| print("PART 4d: Synthetic Storage Error Images") |
| print("="*60) |
|
|
| img_out = output_dir / "synth_images" / "storage_failure" |
| img_out.mkdir(parents=True, exist_ok=True) |
|
|
| for i in range(n_samples): |
| style = random.choice(["smart_warning", "disk_error", "chkdsk", "crystaldisk"]) |
| w, h = random.choice([(1920, 1080), (1280, 1024)]) |
|
|
| if style == "smart_warning": |
| img = Image.new("RGB", (w, h), color="#000000") |
| draw = ImageDraw.Draw(img) |
| draw.text((w//6, h//3), "WARNING:", fill="#FFAA00") |
| draw.text((w//6, h//3+40), random.choice(SMART_ERRORS), fill="#FF0000") |
| draw.text((w//6, h//3+80), "Immediately back up your data.", fill="#FFFFFF") |
| elif style == "disk_error": |
| img = Image.new("RGB", (w, h), color="#000000") |
| draw = ImageDraw.Draw(img) |
| draw.text((20, h//3), "A disk read error occurred", fill="#CCCCCC") |
| draw.text((20, h//3+30), "Press Ctrl+Alt+Del to restart", fill="#AAAAAA") |
| elif style == "chkdsk": |
| img = Image.new("RGB", (w, h), color="#000000") |
| draw = ImageDraw.Draw(img) |
| pct = random.randint(5, 95) |
| draw.text((20, 20), "Checking file system on C:", fill="#CCCCCC") |
| draw.text((20, 50), "The type of the file system is NTFS.", fill="#CCCCCC") |
| draw.text((20, 100), f"CHKDSK is verifying files (stage {random.randint(1,5)} of 5)... {pct}%", fill="#FFFFFF") |
| draw.text((20, 140), f"Windows found {random.randint(1,500)} bad sectors.", fill="#FF4444") |
| else: |
| img = Image.new("RGB", (w, h), color="#FFFFFF") |
| draw = ImageDraw.Draw(img) |
| draw.rectangle([0, 0, w, 40], fill="#3366CC") |
| draw.text((10, 10), "CrystalDiskInfo - Health Status: CAUTION", fill="white") |
| y = 60 |
| health = random.choice(["Caution", "Bad"]) |
| draw.text((20, y), f"Health Status: {health}", fill="#FF0000" if health=="Bad" else "#FFA500") |
| y += 30 |
| draw.text((20, y), f"Reallocated Sectors: {random.randint(50,2000)}", fill="#FF0000") |
|
|
| img = img.resize(IMAGE_SIZE, Image.LANCZOS) |
| img.save(str(img_out / f"storage_{i:04d}.jpg"), quality=random.randint(80, 95)) |
|
|
| print(f" ✓ Generated {n_samples} storage error images") |
|
|
|
|
| def generate_normal_images(output_dir, n_samples=200): |
| """Generate normal operation desktop screenshots.""" |
| print("\n" + "="*60) |
| print("PART 4e: Synthetic Normal Desktop Images") |
| print("="*60) |
|
|
| img_out = output_dir / "synth_images" / "normal_operation" |
| img_out.mkdir(parents=True, exist_ok=True) |
|
|
| COLORS = ["#0078D4", "#1B5E20", "#283593", "#1A237E", "#004D40", |
| "#311B92", "#880E4F", "#BF360C", "#006064", "#263238"] |
|
|
| for i in range(n_samples): |
| w, h = 1920, 1080 |
| img = Image.new("RGB", (w, h), color=random.choice(COLORS)) |
| draw = ImageDraw.Draw(img) |
| |
| for y_pos in range(h): |
| darken = int(40 * y_pos / h) |
| r, g, b = img.getpixel((0, y_pos)) |
| draw.line([(0, y_pos), (w, y_pos)], fill=(max(0,r-darken), max(0,g-darken), max(0,b-darken))) |
| |
| draw.rectangle([0, h-48, w, h], fill="#1F1F1F") |
| draw.text((w-100, h-35), f"{random.randint(1,12):02d}:{random.randint(0,59):02d} PM", fill="#FFFFFF") |
| |
| if random.random() < 0.4: |
| wx, wy = random.randint(100, w//2), random.randint(50, h//3) |
| ww, wh = random.randint(400, 800), random.randint(300, 500) |
| draw.rectangle([wx, wy, wx+ww, wy+wh], fill="#FFFFFF", outline="#CCCCCC") |
| draw.rectangle([wx, wy, wx+ww, wy+30], fill="#F0F0F0") |
|
|
| img = img.resize(IMAGE_SIZE, Image.LANCZOS) |
| img.save(str(img_out / f"normal_{i:04d}.jpg"), quality=random.randint(80, 95)) |
|
|
| print(f" ✓ Generated {n_samples} normal desktop images") |
|
|
|
|
| |
| |
| |
|
|
| def build_final_dataset(build_dir, max_per_class=None): |
| """Combine all sources into a unified dataset with paired audio+image samples.""" |
| print("\n" + "="*60) |
| print("PART 5: Building Final Dataset") |
| print("="*60) |
|
|
| audio_by_class = defaultdict(list) |
| for audio_dir in [build_dir / "audio", build_dir / "hf_audio", build_dir / "synth_audio"]: |
| if not audio_dir.exists(): |
| continue |
| for f in audio_dir.rglob("*.wav"): |
| fname = f.stem.lower() |
| for class_name in FAULT_NAME_TO_ID: |
| if fname.startswith(class_name): |
| audio_by_class[class_name].append(str(f)) |
| break |
|
|
| yt_audio_dir = build_dir / "youtube" / "audio" |
| if yt_audio_dir.exists(): |
| for class_dir in yt_audio_dir.iterdir(): |
| if class_dir.is_dir() and class_dir.name in FAULT_NAME_TO_ID: |
| for f in class_dir.glob("*.wav"): |
| audio_by_class[class_dir.name].append(str(f)) |
|
|
| image_by_class = defaultdict(list) |
| for img_dir in [build_dir / "synth_images"]: |
| if not img_dir.exists(): |
| continue |
| for class_dir in img_dir.iterdir(): |
| if class_dir.is_dir() and class_dir.name in FAULT_NAME_TO_ID: |
| for f in class_dir.glob("*.jpg"): |
| image_by_class[class_dir.name].append(str(f)) |
| for f in class_dir.glob("*.png"): |
| image_by_class[class_dir.name].append(str(f)) |
|
|
| yt_frames_dir = build_dir / "youtube" / "frames" |
| if yt_frames_dir.exists(): |
| for class_dir in yt_frames_dir.iterdir(): |
| if class_dir.is_dir() and class_dir.name in FAULT_NAME_TO_ID: |
| for f in class_dir.glob("*.jpg"): |
| image_by_class[class_dir.name].append(str(f)) |
|
|
| print("\n Audio files per class:") |
| for cls in FAULT_CLASSES.values(): |
| print(f" {cls}: {len(audio_by_class[cls])}") |
| print("\n Image files per class:") |
| for cls in FAULT_CLASSES.values(): |
| print(f" {cls}: {len(image_by_class[cls])}") |
|
|
| all_samples = [] |
| for class_name, class_id in FAULT_NAME_TO_ID.items(): |
| audios = audio_by_class[class_name] |
| images = image_by_class[class_name] |
| if not audios and not images: |
| print(f" âš No data for {class_name}, skipping") |
| continue |
| n_pairs = max(len(audios), len(images)) |
| if max_per_class: |
| n_pairs = min(n_pairs, max_per_class) |
| for i in range(n_pairs): |
| sample = {"fault_class": class_id, "fault_name": class_name} |
| sample["audio_path"] = audios[i % len(audios)] if audios else None |
| sample["image_path"] = images[i % len(images)] if images else None |
| all_samples.append(sample) |
|
|
| random.shuffle(all_samples) |
|
|
| print(f"\n Total paired samples: {len(all_samples)}") |
| class_dist = Counter(s["fault_name"] for s in all_samples) |
| for cls, count in sorted(class_dist.items()): |
| print(f" {cls}: {count}") |
|
|
| manifest_path = build_dir / "dataset_manifest.json" |
| with open(manifest_path, "w") as f: |
| json.dump({"total_samples": len(all_samples), "class_distribution": dict(class_dist), "samples": all_samples}, f, indent=2) |
| print(f"\n ✓ Manifest saved to {manifest_path}") |
| return all_samples |
|
|
|
|
| def upload_to_hub(build_dir, repo_id="Ellaft/pc-fault-real-dataset"): |
| """Upload the built dataset to HuggingFace Hub.""" |
| print("\n" + "="*60) |
| print(f"Uploading to {repo_id}") |
| print("="*60) |
|
|
| from datasets import Dataset, Audio, DatasetDict |
| from datasets import Image as HFImage |
|
|
| manifest_path = build_dir / "dataset_manifest.json" |
| with open(manifest_path) as f: |
| manifest = json.load(f) |
|
|
| data = {"audio": [], "image": [], "fault_class": [], "fault_name": [], "source": []} |
| for s in manifest["samples"]: |
| data["fault_class"].append(s["fault_class"]) |
| data["fault_name"].append(s["fault_name"]) |
| audio_path = s.get("audio_path") |
| if audio_path: |
| src = "youtube" if "youtube" in audio_path else ("huggingface" if "hf_audio" in audio_path else "synthetic") |
| data["source"].append(src) |
| data["audio"].append(audio_path) |
| else: |
| data["source"].append("synthetic") |
| data["audio"].append(None) |
| data["image"].append(s.get("image_path")) |
|
|
| ds = Dataset.from_dict(data) |
| ds = ds.cast_column("audio", Audio(sampling_rate=16000)) |
| ds = ds.cast_column("image", HFImage()) |
|
|
| ds = ds.train_test_split(test_size=0.3, seed=42, stratify_by_column="fault_class") |
| test_val = ds["test"].train_test_split(test_size=0.5, seed=42, stratify_by_column="fault_class") |
|
|
| final_ds = DatasetDict({"train": ds["train"], "validation": test_val["train"], "test": test_val["test"]}) |
| print(f" Train: {len(final_ds['train'])}, Val: {len(final_ds['validation'])}, Test: {len(final_ds['test'])}") |
|
|
| final_ds.push_to_hub(repo_id, private=False) |
| print(f" ✓ Uploaded to https://huggingface.co/datasets/{repo_id}") |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="PC Fault Detection Dataset Builder") |
| parser.add_argument("--output_dir", default="./dataset_build") |
| parser.add_argument("--skip_youtube", action="store_true", help="Skip YouTube scraping") |
| parser.add_argument("--youtube_only", action="store_true", help="Only do YouTube scraping") |
| parser.add_argument("--skip_hf", action="store_true", help="Skip HuggingFace dataset downloads") |
| parser.add_argument("--skip_synth_audio", action="store_true") |
| parser.add_argument("--skip_synth_images", action="store_true") |
| parser.add_argument("--max_per_class", type=int, default=300) |
| parser.add_argument("--max_yt_videos", type=int, default=3, help="Max YouTube videos per search query") |
| parser.add_argument("--upload", action="store_true", help="Upload to HuggingFace Hub") |
| parser.add_argument("--hub_repo", default="Ellaft/pc-fault-real-dataset") |
| args = parser.parse_args() |
|
|
| output_dir = Path(args.output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| print("="*60) |
| print("PC Fault Detection — Dataset Builder") |
| print("="*60) |
| print(f"Output: {output_dir}") |
| print(f"Max per class: {args.max_per_class}") |
|
|
| if not args.skip_youtube: |
| run_youtube_scraper(output_dir / "youtube", max_videos_per_query=args.max_yt_videos) |
|
|
| if args.youtube_only: |
| print("\n✓ YouTube scraping complete.") |
| return |
|
|
| if not args.skip_hf: |
| download_cooling_fans(output_dir, max_per_class=args.max_per_class) |
| try: |
| download_fsd50k_relevant(output_dir, max_per_class=args.max_per_class // 3) |
| except Exception as e: |
| print(f" âš FSD50K skipped: {e}") |
|
|
| if not args.skip_synth_audio: |
| generate_beep_codes(output_dir, n_per_pattern=args.max_per_class // 12) |
| generate_crash_audio(output_dir, n_samples=args.max_per_class) |
| generate_hdd_click_audio(output_dir, n_samples=args.max_per_class) |
|
|
| if not args.skip_synth_images: |
| generate_bsod_images(output_dir, n_samples=args.max_per_class) |
| generate_post_screens(output_dir, n_samples=args.max_per_class) |
| generate_thermal_images(output_dir, n_samples=args.max_per_class) |
| generate_storage_error_images(output_dir, n_samples=args.max_per_class) |
| generate_normal_images(output_dir, n_samples=args.max_per_class) |
|
|
| build_final_dataset(output_dir, max_per_class=args.max_per_class) |
|
|
| if args.upload: |
| upload_to_hub(output_dir, repo_id=args.hub_repo) |
|
|
| print("\n" + "="*60) |
| print("✓ Dataset build complete!") |
| print(f" Manifest: {output_dir}/dataset_manifest.json") |
| print(f" To upload: python build_dataset.py --upload") |
| print("="*60) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|