ppe-training-scripts / train_ppe_fixed.py
baskarmother's picture
Upload train_ppe_fixed.py
ab48c24 verified
#!/usr/bin/env python3
"""
PPE Compliance Detection Training - FIXED VERSION
- Swaps opencv-python for opencv-python-headless before importing ultralytics
- Downloads keremberke dataset as ZIP files (script-based datasets no longer supported)
- Uses model.train() return value correctly (no results.best)
- Pushes best.pt to HuggingFace Hub after training
"""
import subprocess
import sys
import os
# FIX: opencv-python needs libGL which is missing in container; use headless instead
print("[0/5] Swapping opencv-python for opencv-python-headless...")
subprocess.run([sys.executable, "-m", "pip", "uninstall", "-y", "opencv-python"],
capture_output=True)
subprocess.run([sys.executable, "-m", "pip", "install", "--quiet", "opencv-python-headless"],
capture_output=True)
print(" Done")
import zipfile
import shutil
import json
from pathlib import Path
from huggingface_hub import hf_hub_download, HfApi
from PIL import Image
import yaml
HF_USERNAME = "baskarmother"
MODEL_ID = "yolov8s-ppe-construction-v2"
DATASET_DIR = Path("/app/combined_ppe_dataset")
EPOCHS = 150
IMG_SIZE = 640
BATCH = 16
DEVICE = "0"
UNIFIED_CLASSES = [
"person", "helmet", "vest", "mask", "gloves",
"safety_shoe", "goggles", "no_helmet", "no_mask",
"no_vest", "head", "barricade", "dumpster",
"excavators", "safety_net", "dump_truck", "truck", "wheel_loader",
]
def download_ppe_dataset():
print("[1/5] Downloading 51ddhesh/PPE_Detection...")
zip_path = hf_hub_download(
repo_id="51ddhesh/PPE_Detection",
filename="PPE.zip",
repo_type="dataset",
cache_dir="/app/hf_cache",
local_dir="/app/downloads",
)
extract_dir = Path("/app/downloads/ppe_dataset")
extract_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(extract_dir)
print(f" Extracted to {extract_dir}")
return extract_dir
def download_keremberke_dataset():
print("[2/5] Downloading keremberke/construction-safety-object-detection...")
download_dir = Path("/app/downloads/keremberke")
download_dir.mkdir(parents=True, exist_ok=True)
for split_file in ["data/train.zip", "data/valid.zip", "data/test.zip"]:
try:
path = hf_hub_download(
repo_id="keremberke/construction-safety-object-detection",
filename=split_file,
repo_type="dataset",
cache_dir="/app/hf_cache",
local_dir=str(download_dir),
)
extract_to = download_dir / split_file.replace("data/", "").replace(".zip", "")
extract_to.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(path, 'r') as zf:
zf.extractall(extract_to)
print(f" Downloaded and extracted {split_file}")
except Exception as e:
print(f" Warning: Could not download {split_file}: {e}")
return download_dir
def convert_keremberke_to_yolo(raw_dir: Path, output_dir: Path):
print("[3/5] Converting keremberke dataset to YOLO format...")
class_map = {
"person": 0, "hardhat": 1, "mask": 3,
"no-hardhat": 7, "no-mask": 8, "no-safety vest": 9,
"gloves": 4, "safety shoes": 5, "safety vest": 2,
"barricade": 11, "dumpster": 12, "excavators": 13,
"safety net": 14, "dump truck": 15,
"mini-van": 0, "truck": 16, "wheel loader": 17,
}
for split in ["train", "valid", "test"]:
images_dir = output_dir / split / "images"
labels_dir = output_dir / split / "labels"
images_dir.mkdir(parents=True, exist_ok=True)
labels_dir.mkdir(parents=True, exist_ok=True)
raw_split_dir = raw_dir / split
if not raw_split_dir.exists():
print(f" WARNING: {raw_split_dir} not found, skipping")
continue
json_files = list(raw_split_dir.rglob("*.json"))
print(f" {split}: Found {len(json_files)} JSON files")
if not json_files:
img_files = []
for ext in ["*.jpg", "*.jpeg", "*.png"]:
img_files.extend(raw_split_dir.rglob(ext))
for img_path in img_files:
shutil.copy2(img_path, images_dir / f"keremberke_{img_path.name}")
print(f" {split}: Copied {len(img_files)} images (no labels)")
continue
for coco_file in json_files:
with open(coco_file) as f:
coco_data = json.load(f)
image_id_to_file = {}
image_id_to_size = {}
for img in coco_data.get("images", []):
image_id_to_file[img["id"]] = img["file_name"]
image_id_to_size[img["id"]] = (img.get("width", 640), img.get("height", 640))
cat_id_to_name = {}
for cat in coco_data.get("categories", []):
cat_id_to_name[cat["id"]] = cat["name"]
anns_by_img = {}
for ann in coco_data.get("annotations", []):
anns_by_img.setdefault(ann["image_id"], []).append(ann)
all_images = {}
for ext in ["*.jpg", "*.jpeg", "*.png"]:
for p in raw_split_dir.rglob(ext):
all_images[p.name] = p
processed = 0
for img_id, filename in image_id_to_file.items():
img_path = all_images.get(filename)
if not img_path:
continue
out_name = f"keremberke_{filename}"
shutil.copy2(img_path, images_dir / out_name)
w, h = image_id_to_size.get(img_id, (640, 640))
label_path = labels_dir / f"{out_name.rsplit('.', 1)[0]}.txt"
with open(label_path, "w") as f:
for ann in anns_by_img.get(img_id, []):
cat_name = cat_id_to_name.get(ann["category_id"], "")
if cat_name not in class_map:
continue
cls = class_map[cat_name]
x, y, bw, bh = ann["bbox"]
xc = (x + bw / 2) / w
yc = (y + bh / 2) / h
nw = bw / w
nh = bh / h
xc = max(0, min(1, xc))
yc = max(0, min(1, yc))
nw = max(0, min(1, nw))
nh = max(0, min(1, nh))
f.write(f"{cls} {xc:.6f} {yc:.6f} {nw:.6f} {nh:.6f}\n")
processed += 1
print(f" {split}: Processed {processed} images from {coco_file.name}")
print(f" Converted to {output_dir}")
def merge_datasets(ppe_extract_dir: Path, keremberke_dir: Path, output_dir: Path):
print("[4/5] Merging datasets...")
output_dir.mkdir(parents=True, exist_ok=True)
ppe_dir = None
for candidate in [ppe_extract_dir / "PPE", ppe_extract_dir / "ppe", ppe_extract_dir]:
if (candidate / "train" / "images").exists():
ppe_dir = candidate
break
if ppe_dir is None:
print(" ERROR: Could not find PPE dataset structure")
os._exit(1)
print(f" Found PPE dataset at: {ppe_dir}")
ppe_class_map = {0: 2, 1: 5, 2: 3, 3: 1, 4: 6, 5: 4}
for split in ["train", "valid", "test"]:
out_images = output_dir / split / "images"
out_labels = output_dir / split / "labels"
out_images.mkdir(parents=True, exist_ok=True)
out_labels.mkdir(parents=True, exist_ok=True)
ppe_images = ppe_dir / split / "images"
ppe_labels = ppe_dir / split / "labels"
if ppe_images.exists():
for img_file in sorted(ppe_images.iterdir()):
if img_file.suffix.lower() not in [".jpg", ".jpeg", ".png"]:
continue
shutil.copy2(img_file, out_images / f"ppe_{img_file.name}")
label_file = ppe_labels / f"{img_file.stem}.txt"
if label_file.exists():
with open(label_file) as f:
lines = f.readlines()
remapped = []
for line in lines:
parts = line.strip().split()
if len(parts) < 5:
continue
src_cls = int(parts[0])
if src_cls in ppe_class_map:
remapped.append(f"{ppe_class_map[src_cls]} {' '.join(parts[1:])}\n")
out_label = out_labels / f"ppe_{img_file.stem}.txt"
with open(out_label, "w") as f:
f.writelines(remapped)
k_images = keremberke_dir / split / "images"
k_labels = keremberke_dir / split / "labels"
if k_images.exists():
for img_file in sorted(k_images.iterdir()):
shutil.copy2(img_file, out_images / img_file.name)
for label_file in sorted(k_labels.iterdir()):
shutil.copy2(label_file, out_labels / label_file.name)
data_yaml = {
"path": str(output_dir.absolute()),
"train": "train/images",
"val": "valid/images",
"test": "test/images",
"names": {i: name for i, name in enumerate(UNIFIED_CLASSES)},
"nc": len(UNIFIED_CLASSES),
}
with open(output_dir / "data.yaml", "w") as f:
yaml.dump(data_yaml, f, default_flow_style=False)
for split in ["train", "valid", "test"]:
n = len(list((output_dir / split / "images").glob("*")))
print(f" {split}: {n} images")
def train_model(data_yaml_path: Path):
print("[5/5] Training YOLOv8s...")
from ultralytics import YOLO
model = YOLO("yolov8s.pt")
model.train(
data=str(data_yaml_path),
epochs=EPOCHS,
imgsz=IMG_SIZE,
batch=BATCH,
device=DEVICE,
patience=30,
project="/app/runs",
name="ppe_improved",
exist_ok=True,
pretrained=True,
optimizer="SGD",
lr0=0.01,
lrf=0.01,
momentum=0.9,
weight_decay=0.0005,
augment=True,
mosaic=1.0,
hsv_h=0.015,
hsv_s=0.7,
hsv_v=0.4,
degrees=5.0,
translate=0.1,
scale=0.5,
shear=2.0,
perspective=0.0,
flipud=0.0,
fliplr=0.5,
)
print(" Training complete!")
best_model = Path("/app/runs/ppe_improved/weights/best.pt")
print(f" Best model saved at: {best_model} (exists={best_model.exists()})")
return best_model
def push_to_hub(best_model_path: Path):
print("Pushing model to HuggingFace Hub...")
api = HfApi()
repo_id = f"{HF_USERNAME}/{MODEL_ID}"
try:
api.create_repo(repo_id=repo_id, repo_type="model", exist_ok=True)
except Exception as e:
print(f" Repo info: {e}")
api.upload_file(
path_or_fileobj=str(best_model_path),
path_in_repo="best.pt",
repo_id=repo_id,
repo_type="model",
)
readme = f"""---
license: cc-by-4.0
library_name: ultralytics
tags:
- object-detection
- ppe
- construction-safety
- yolov8
---
# {MODEL_ID}
Improved PPE Compliance Detection Model for Construction Sites (v2)
## Classes ({len(UNIFIED_CLASSES)})
{chr(10).join(f"- {i}: {name}" for i, name in enumerate(UNIFIED_CLASSES))}
## Usage
```python
from ultralytics import YOLO
model = YOLO("hf://{repo_id}/best.pt")
results = model.predict("image.jpg")
```
## Training Details
- Base Model: YOLOv8s
- Epochs: {EPOCHS}
- Image Size: {IMG_SIZE}x{IMG_SIZE}
- Batch Size: {BATCH}
"""
api.upload_file(
path_or_fileobj=readme.encode(),
path_in_repo="README.md",
repo_id=repo_id,
repo_type="model",
)
print(f" Model pushed to https://huggingface.co/{repo_id}")
def main():
print("=" * 60)
print("IMPROVED PPE DETECTION TRAINING (FIXED)")
print("=" * 60)
ppe_dir = download_ppe_dataset()
keremberke_raw = download_keremberke_dataset()
keremberke_yolo = Path("/app/keremberke_yolo")
convert_keremberke_to_yolo(keremberke_raw, keremberke_yolo)
DATASET_DIR.mkdir(parents=True, exist_ok=True)
merge_datasets(ppe_dir, keremberke_yolo, DATASET_DIR)
best_model = train_model(DATASET_DIR / "data.yaml")
if best_model.exists():
push_to_hub(best_model)
else:
print(f" WARNING: Best model not found at {best_model}")
for pt in Path("/app/runs").rglob("best.pt"):
push_to_hub(pt)
break
print("=" * 60)
print("DONE!")
print("=" * 60)
if __name__ == "__main__":
main()