#!/usr/bin/env python3 """ PPE Compliance Detection Training - FIXED VERSION - Swaps opencv-python for opencv-python-headless before importing ultralytics - Downloads keremberke dataset as ZIP files (script-based datasets no longer supported) - Uses model.train() return value correctly (no results.best) - Pushes best.pt to HuggingFace Hub after training """ import subprocess import sys import os # FIX: opencv-python needs libGL which is missing in container; use headless instead print("[0/5] Swapping opencv-python for opencv-python-headless...") subprocess.run([sys.executable, "-m", "pip", "uninstall", "-y", "opencv-python"], capture_output=True) subprocess.run([sys.executable, "-m", "pip", "install", "--quiet", "opencv-python-headless"], capture_output=True) print(" Done") import zipfile import shutil import json from pathlib import Path from huggingface_hub import hf_hub_download, HfApi from PIL import Image import yaml HF_USERNAME = "baskarmother" MODEL_ID = "yolov8s-ppe-construction-v2" DATASET_DIR = Path("/app/combined_ppe_dataset") EPOCHS = 150 IMG_SIZE = 640 BATCH = 16 DEVICE = "0" UNIFIED_CLASSES = [ "person", "helmet", "vest", "mask", "gloves", "safety_shoe", "goggles", "no_helmet", "no_mask", "no_vest", "head", "barricade", "dumpster", "excavators", "safety_net", "dump_truck", "truck", "wheel_loader", ] def download_ppe_dataset(): print("[1/5] Downloading 51ddhesh/PPE_Detection...") zip_path = hf_hub_download( repo_id="51ddhesh/PPE_Detection", filename="PPE.zip", repo_type="dataset", cache_dir="/app/hf_cache", local_dir="/app/downloads", ) extract_dir = Path("/app/downloads/ppe_dataset") extract_dir.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(zip_path, 'r') as zf: zf.extractall(extract_dir) print(f" Extracted to {extract_dir}") return extract_dir def download_keremberke_dataset(): print("[2/5] Downloading keremberke/construction-safety-object-detection...") download_dir = Path("/app/downloads/keremberke") download_dir.mkdir(parents=True, exist_ok=True) for split_file in ["data/train.zip", "data/valid.zip", "data/test.zip"]: try: path = hf_hub_download( repo_id="keremberke/construction-safety-object-detection", filename=split_file, repo_type="dataset", cache_dir="/app/hf_cache", local_dir=str(download_dir), ) extract_to = download_dir / split_file.replace("data/", "").replace(".zip", "") extract_to.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(path, 'r') as zf: zf.extractall(extract_to) print(f" Downloaded and extracted {split_file}") except Exception as e: print(f" Warning: Could not download {split_file}: {e}") return download_dir def convert_keremberke_to_yolo(raw_dir: Path, output_dir: Path): print("[3/5] Converting keremberke dataset to YOLO format...") class_map = { "person": 0, "hardhat": 1, "mask": 3, "no-hardhat": 7, "no-mask": 8, "no-safety vest": 9, "gloves": 4, "safety shoes": 5, "safety vest": 2, "barricade": 11, "dumpster": 12, "excavators": 13, "safety net": 14, "dump truck": 15, "mini-van": 0, "truck": 16, "wheel loader": 17, } for split in ["train", "valid", "test"]: images_dir = output_dir / split / "images" labels_dir = output_dir / split / "labels" images_dir.mkdir(parents=True, exist_ok=True) labels_dir.mkdir(parents=True, exist_ok=True) raw_split_dir = raw_dir / split if not raw_split_dir.exists(): print(f" WARNING: {raw_split_dir} not found, skipping") continue json_files = list(raw_split_dir.rglob("*.json")) print(f" {split}: Found {len(json_files)} JSON files") if not json_files: img_files = [] for ext in ["*.jpg", "*.jpeg", "*.png"]: img_files.extend(raw_split_dir.rglob(ext)) for img_path in img_files: shutil.copy2(img_path, images_dir / f"keremberke_{img_path.name}") print(f" {split}: Copied {len(img_files)} images (no labels)") continue for coco_file in json_files: with open(coco_file) as f: coco_data = json.load(f) image_id_to_file = {} image_id_to_size = {} for img in coco_data.get("images", []): image_id_to_file[img["id"]] = img["file_name"] image_id_to_size[img["id"]] = (img.get("width", 640), img.get("height", 640)) cat_id_to_name = {} for cat in coco_data.get("categories", []): cat_id_to_name[cat["id"]] = cat["name"] anns_by_img = {} for ann in coco_data.get("annotations", []): anns_by_img.setdefault(ann["image_id"], []).append(ann) all_images = {} for ext in ["*.jpg", "*.jpeg", "*.png"]: for p in raw_split_dir.rglob(ext): all_images[p.name] = p processed = 0 for img_id, filename in image_id_to_file.items(): img_path = all_images.get(filename) if not img_path: continue out_name = f"keremberke_{filename}" shutil.copy2(img_path, images_dir / out_name) w, h = image_id_to_size.get(img_id, (640, 640)) label_path = labels_dir / f"{out_name.rsplit('.', 1)[0]}.txt" with open(label_path, "w") as f: for ann in anns_by_img.get(img_id, []): cat_name = cat_id_to_name.get(ann["category_id"], "") if cat_name not in class_map: continue cls = class_map[cat_name] x, y, bw, bh = ann["bbox"] xc = (x + bw / 2) / w yc = (y + bh / 2) / h nw = bw / w nh = bh / h xc = max(0, min(1, xc)) yc = max(0, min(1, yc)) nw = max(0, min(1, nw)) nh = max(0, min(1, nh)) f.write(f"{cls} {xc:.6f} {yc:.6f} {nw:.6f} {nh:.6f}\n") processed += 1 print(f" {split}: Processed {processed} images from {coco_file.name}") print(f" Converted to {output_dir}") def merge_datasets(ppe_extract_dir: Path, keremberke_dir: Path, output_dir: Path): print("[4/5] Merging datasets...") output_dir.mkdir(parents=True, exist_ok=True) ppe_dir = None for candidate in [ppe_extract_dir / "PPE", ppe_extract_dir / "ppe", ppe_extract_dir]: if (candidate / "train" / "images").exists(): ppe_dir = candidate break if ppe_dir is None: print(" ERROR: Could not find PPE dataset structure") os._exit(1) print(f" Found PPE dataset at: {ppe_dir}") ppe_class_map = {0: 2, 1: 5, 2: 3, 3: 1, 4: 6, 5: 4} for split in ["train", "valid", "test"]: out_images = output_dir / split / "images" out_labels = output_dir / split / "labels" out_images.mkdir(parents=True, exist_ok=True) out_labels.mkdir(parents=True, exist_ok=True) ppe_images = ppe_dir / split / "images" ppe_labels = ppe_dir / split / "labels" if ppe_images.exists(): for img_file in sorted(ppe_images.iterdir()): if img_file.suffix.lower() not in [".jpg", ".jpeg", ".png"]: continue shutil.copy2(img_file, out_images / f"ppe_{img_file.name}") label_file = ppe_labels / f"{img_file.stem}.txt" if label_file.exists(): with open(label_file) as f: lines = f.readlines() remapped = [] for line in lines: parts = line.strip().split() if len(parts) < 5: continue src_cls = int(parts[0]) if src_cls in ppe_class_map: remapped.append(f"{ppe_class_map[src_cls]} {' '.join(parts[1:])}\n") out_label = out_labels / f"ppe_{img_file.stem}.txt" with open(out_label, "w") as f: f.writelines(remapped) k_images = keremberke_dir / split / "images" k_labels = keremberke_dir / split / "labels" if k_images.exists(): for img_file in sorted(k_images.iterdir()): shutil.copy2(img_file, out_images / img_file.name) for label_file in sorted(k_labels.iterdir()): shutil.copy2(label_file, out_labels / label_file.name) data_yaml = { "path": str(output_dir.absolute()), "train": "train/images", "val": "valid/images", "test": "test/images", "names": {i: name for i, name in enumerate(UNIFIED_CLASSES)}, "nc": len(UNIFIED_CLASSES), } with open(output_dir / "data.yaml", "w") as f: yaml.dump(data_yaml, f, default_flow_style=False) for split in ["train", "valid", "test"]: n = len(list((output_dir / split / "images").glob("*"))) print(f" {split}: {n} images") def train_model(data_yaml_path: Path): print("[5/5] Training YOLOv8s...") from ultralytics import YOLO model = YOLO("yolov8s.pt") model.train( data=str(data_yaml_path), epochs=EPOCHS, imgsz=IMG_SIZE, batch=BATCH, device=DEVICE, patience=30, project="/app/runs", name="ppe_improved", exist_ok=True, pretrained=True, optimizer="SGD", lr0=0.01, lrf=0.01, momentum=0.9, weight_decay=0.0005, augment=True, mosaic=1.0, hsv_h=0.015, hsv_s=0.7, hsv_v=0.4, degrees=5.0, translate=0.1, scale=0.5, shear=2.0, perspective=0.0, flipud=0.0, fliplr=0.5, ) print(" Training complete!") best_model = Path("/app/runs/ppe_improved/weights/best.pt") print(f" Best model saved at: {best_model} (exists={best_model.exists()})") return best_model def push_to_hub(best_model_path: Path): print("Pushing model to HuggingFace Hub...") api = HfApi() repo_id = f"{HF_USERNAME}/{MODEL_ID}" try: api.create_repo(repo_id=repo_id, repo_type="model", exist_ok=True) except Exception as e: print(f" Repo info: {e}") api.upload_file( path_or_fileobj=str(best_model_path), path_in_repo="best.pt", repo_id=repo_id, repo_type="model", ) readme = f"""--- license: cc-by-4.0 library_name: ultralytics tags: - object-detection - ppe - construction-safety - yolov8 --- # {MODEL_ID} Improved PPE Compliance Detection Model for Construction Sites (v2) ## Classes ({len(UNIFIED_CLASSES)}) {chr(10).join(f"- {i}: {name}" for i, name in enumerate(UNIFIED_CLASSES))} ## Usage ```python from ultralytics import YOLO model = YOLO("hf://{repo_id}/best.pt") results = model.predict("image.jpg") ``` ## Training Details - Base Model: YOLOv8s - Epochs: {EPOCHS} - Image Size: {IMG_SIZE}x{IMG_SIZE} - Batch Size: {BATCH} """ api.upload_file( path_or_fileobj=readme.encode(), path_in_repo="README.md", repo_id=repo_id, repo_type="model", ) print(f" Model pushed to https://huggingface.co/{repo_id}") def main(): print("=" * 60) print("IMPROVED PPE DETECTION TRAINING (FIXED)") print("=" * 60) ppe_dir = download_ppe_dataset() keremberke_raw = download_keremberke_dataset() keremberke_yolo = Path("/app/keremberke_yolo") convert_keremberke_to_yolo(keremberke_raw, keremberke_yolo) DATASET_DIR.mkdir(parents=True, exist_ok=True) merge_datasets(ppe_dir, keremberke_yolo, DATASET_DIR) best_model = train_model(DATASET_DIR / "data.yaml") if best_model.exists(): push_to_hub(best_model) else: print(f" WARNING: Best model not found at {best_model}") for pt in Path("/app/runs").rglob("best.pt"): push_to_hub(pt) break print("=" * 60) print("DONE!") print("=" * 60) if __name__ == "__main__": main()