Spaces:
Running
Running
| import torch | |
| import torchvision | |
| from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights | |
| from PIL import Image, ImageDraw | |
| import os | |
| import math | |
| # Map COCO classes to our Hackathon targets | |
| TARGET_CLASSES = { | |
| 1: 'Person', | |
| 2: 'Bicycle', | |
| 3: 'Car', | |
| 4: 'Motorcycle' | |
| } | |
| def load_perception_model(): | |
| print("[System] Loading Faster R-CNN (ResNet-50-FPN Backbone)...") | |
| weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT | |
| model = fasterrcnn_resnet50_fpn(weights=weights, progress=False) | |
| model.eval() | |
| return model, weights | |
| def extract_features(img_path, model, weights, score_threshold=0.7): | |
| image = Image.open(img_path).convert("RGB") | |
| preprocess = weights.transforms() | |
| input_batch = preprocess(image).unsqueeze(0) | |
| with torch.no_grad(): | |
| prediction = model(input_batch)[0] | |
| extracted = [] | |
| for i, box in enumerate(prediction['boxes']): | |
| score = prediction['scores'][i].item() | |
| label = prediction['labels'][i].item() | |
| if score > score_threshold and label in TARGET_CLASSES: | |
| box = box.tolist() | |
| class_name = TARGET_CLASSES[label] | |
| # Get bottom-center coordinate for BEV mapping | |
| center_x = (box[0] + box[2]) / 2.0 | |
| bottom_y = box[3] | |
| extracted.append({ | |
| 'type': class_name, | |
| 'bbox': box, | |
| 'coord': (center_x, bottom_y) | |
| }) | |
| return extracted, image | |
| def calculate_distance(c1, c2): | |
| return math.sqrt((c1[0] - c2[0])**2 + (c1[1] - c2[1])**2) | |
| def process_frame_sequence(frame1_path, frame2_path, model, weights): | |
| """ | |
| Takes 2 sequential frames, detects objects, matches them to find movement, | |
| and bridges the data to the AI Brain. | |
| """ | |
| print(f"\n[Step 1] Analyzing Frame T-1: {os.path.basename(frame1_path)}") | |
| objs_f1, img1 = extract_features(frame1_path, model, weights) | |
| print(f"[Step 2] Analyzing Frame T0: {os.path.basename(frame2_path)}") | |
| objs_f2, img2 = extract_features(frame2_path, model, weights) | |
| print("\n[Step 3] Temporal Tracking (Finding Moving Cyclists/Pedestrians)") | |
| tracked_history = [] | |
| # Simple Tracking by linking nearest objects between Frame 1 and Frame 2 | |
| for obj2 in objs_f2: | |
| best_match = None | |
| min_dist = float('inf') | |
| for obj1 in objs_f1: | |
| if obj1['type'] == obj2['type']: # Must be same class | |
| dist = calculate_distance(obj1['coord'], obj2['coord']) | |
| if dist < 50.0: # Max pixel movement threshold between 2 frames | |
| min_dist = dist | |
| best_match = obj1 | |
| if best_match: | |
| # Calculate movement vector (Velocity) | |
| dx = obj2['coord'][0] - best_match['coord'][0] | |
| dy = obj2['coord'][1] - best_match['coord'][1] | |
| is_moving = abs(dx) > 1.0 or abs(dy) > 1.0 | |
| if is_moving and obj2['type'] in ['Person', 'Bicycle']: | |
| print(f" -> Spotted Moving {obj2['type']}! dx: {dx:.2f}, dy: {dy:.2f}") | |
| # Format: [(x_t-1, y_t-1), (x_t0, y_t0)] | |
| # This is EXACTLY what the AI Brain needs! | |
| history = [best_match['coord'], obj2['coord']] | |
| tracked_history.append({ | |
| "type": obj2['type'], | |
| "history": history | |
| }) | |
| print(f"\n[Step 4] Handoff to AI Brain: Found {len(tracked_history)} moving VRUs.") | |
| return tracked_history | |
| if __name__ == '__main__': | |
| # We will use two identical images to simulate the script architecture | |
| # In reality, this would be image_001.jpg and image_002.jpg | |
| import glob | |
| cam_front_images = glob.glob("DataSet/samples/CAM_FRONT/*.jpg") | |
| if len(cam_front_images) >= 2: | |
| f1 = cam_front_images[0] | |
| f2 = cam_front_images[1] # Next sequential frame | |
| try: | |
| model, weights = load_perception_model() | |
| vru_data_for_ai = process_frame_sequence(f1, f2, model, weights) | |
| print("\n--- FINAL JSON PAYLOAD FOR TRANSFORMER MODEL ---") | |
| for person in vru_data_for_ai: | |
| print(f"Target: {person['type']}") | |
| print(f"Historical Trajectory [T-1, T0]: {person['history']}") | |
| except Exception as e: | |
| print("Model not loaded, but script structure is ready.") | |