IntentDrive / backend /scripts /data /build_dataset_from_images.py
sajith-0701
Deploy FastAPI backend to HF Spaces (Docker SDK)
98075af
import torch
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
from PIL import Image
import os
import glob
import math
import json
TARGET_CLASSES = {1: 'Person', 2: 'Bicycle', 3: 'Car', 4: 'Motorcycle'}
# Set up GPU acceleration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def load_perception_model():
print(f"[System] Loading Pre-Trained Faster R-CNN (ResNet-50-FPN) on {device.type.upper()}...")
weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
model = fasterrcnn_resnet50_fpn(weights=weights, progress=False)
model.to(device) # Move model to GPU
model.eval()
return model, weights
def extract_features(img_path, model, weights, score_threshold=0.7):
image = Image.open(img_path).convert("RGB")
preprocess = weights.transforms()
# Move the image tensor to the GPU so the math runs on CUDA
input_batch = preprocess(image).unsqueeze(0).to(device)
with torch.no_grad():
prediction = model(input_batch)[0]
extracted = []
# prediction items are on GPU, so we use .item() to pull the raw number back out
for i, box in enumerate(prediction['boxes']):
score = prediction['scores'][i].item()
label = prediction['labels'][i].item()
if score > score_threshold and label in TARGET_CLASSES:
center_x = (box[0] + box[2]).item() / 2.0
bottom_y = box[3].item()
extracted.append({
'type': TARGET_CLASSES[label],
'coord': (round(center_x, 2), round(bottom_y, 2))
})
return extracted
def process_dataset_into_trajectories():
print("="*60)
print(f"| Starting Automated Dataset Pre-Processing Pipeline |")
print(f"| Hardware Acceleration: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU'} |")
print("="*60)
model, weights = load_perception_model()
# Get images chronologically to simulate a video feed
image_paths = sorted(glob.glob("DataSet/samples/CAM_FRONT/*.jpg"))
if not image_paths:
print("[!] No images found to process.")
return
print(f"[System] Success: Found a total of {len(image_paths)} valid image frames in the folder. Processing now...")
dataset_trajectories = []
# We need 4 frames of history for our AI Model (T-3, T-2, T-1, T0)
for i in range(len(image_paths) - 3):
frames = image_paths[i:i+4]
frame_data = []
# Output progress every 50 frames
if i % 50 == 0:
print(f" -> Processing frame sequence {i}/{len(image_paths)}")
for f in frames:
objs = extract_features(f, model, weights)
frame_data.append(objs)
for obj_t0 in frame_data[0]:
target_type = obj_t0['type']
track_history = [obj_t0['coord']]
valid_track = True
last_coord = obj_t0['coord']
for t in range(1, 4):
best_dist = float('inf')
best_coord = None
for obj_t_next in frame_data[t]:
if obj_t_next['type'] == target_type:
dist = math.sqrt((last_coord[0] - obj_t_next['coord'][0])**2 +
(last_coord[1] - obj_t_next['coord'][1])**2)
if dist < 60.0 and dist < best_dist:
best_dist = dist
best_coord = obj_t_next['coord']
if best_coord:
track_history.append(best_coord)
last_coord = best_coord
else:
valid_track = False
break
if valid_track:
dataset_trajectories.append({
"agent_type": target_type,
"trajectory_pixels": track_history
})
output_file = "extracted_training_data.json"
with open(output_file, "w") as f:
json.dump(dataset_trajectories, f, indent=4)
print(f"\n[Success] Pipeline Complete!")
print(f"[+] Extracted {len(dataset_trajectories)} valid moving trajectories from raw images.")
print(f"[+] Saved AI Training payload to: {output_file}")
if __name__ == '__main__':
try:
process_dataset_into_trajectories()
except Exception as e:
print(f"Error during processing: {e}")