File size: 4,542 Bytes
98075af
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import torch
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
from PIL import Image, ImageDraw
import os
import math

# Map COCO classes to our Hackathon targets
TARGET_CLASSES = {
    1: 'Person',
    2: 'Bicycle',
    3: 'Car',
    4: 'Motorcycle'
}

def load_perception_model():
    print("[System] Loading Faster R-CNN (ResNet-50-FPN Backbone)...")
    weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
    model = fasterrcnn_resnet50_fpn(weights=weights, progress=False)
    model.eval()
    return model, weights

def extract_features(img_path, model, weights, score_threshold=0.7):
    image = Image.open(img_path).convert("RGB")
    preprocess = weights.transforms()
    input_batch = preprocess(image).unsqueeze(0)
    
    with torch.no_grad():
        prediction = model(input_batch)[0]
        
    extracted = []
    for i, box in enumerate(prediction['boxes']):
        score = prediction['scores'][i].item()
        label = prediction['labels'][i].item()
        
        if score > score_threshold and label in TARGET_CLASSES:
            box = box.tolist()
            class_name = TARGET_CLASSES[label]
            # Get bottom-center coordinate for BEV mapping
            center_x = (box[0] + box[2]) / 2.0
            bottom_y = box[3]
            
            extracted.append({
                'type': class_name,
                'bbox': box,
                'coord': (center_x, bottom_y)
            })
    return extracted, image

def calculate_distance(c1, c2):
    return math.sqrt((c1[0] - c2[0])**2 + (c1[1] - c2[1])**2)

def process_frame_sequence(frame1_path, frame2_path, model, weights):
    """
    Takes 2 sequential frames, detects objects, matches them to find movement, 
    and bridges the data to the AI Brain.
    """
    print(f"\n[Step 1] Analyzing Frame T-1: {os.path.basename(frame1_path)}")
    objs_f1, img1 = extract_features(frame1_path, model, weights)
    
    print(f"[Step 2] Analyzing Frame T0: {os.path.basename(frame2_path)}")
    objs_f2, img2 = extract_features(frame2_path, model, weights)
    
    print("\n[Step 3] Temporal Tracking (Finding Moving Cyclists/Pedestrians)")
    tracked_history = []
    
    # Simple Tracking by linking nearest objects between Frame 1 and Frame 2
    for obj2 in objs_f2:
        best_match = None
        min_dist = float('inf')
        
        for obj1 in objs_f1:
            if obj1['type'] == obj2['type']: # Must be same class
                dist = calculate_distance(obj1['coord'], obj2['coord'])
                if dist < 50.0:  # Max pixel movement threshold between 2 frames
                    min_dist = dist
                    best_match = obj1
                    
        if best_match:
            # Calculate movement vector (Velocity)
            dx = obj2['coord'][0] - best_match['coord'][0]
            dy = obj2['coord'][1] - best_match['coord'][1]
            is_moving = abs(dx) > 1.0 or abs(dy) > 1.0
            
            if is_moving and obj2['type'] in ['Person', 'Bicycle']:
                print(f" -> Spotted Moving {obj2['type']}! dx: {dx:.2f}, dy: {dy:.2f}")
                
                # Format: [(x_t-1, y_t-1), (x_t0, y_t0)] 
                # This is EXACTLY what the AI Brain needs!
                history = [best_match['coord'], obj2['coord']]
                
                tracked_history.append({
                    "type": obj2['type'],
                    "history": history
                })
                
    print(f"\n[Step 4] Handoff to AI Brain: Found {len(tracked_history)} moving VRUs.")
    return tracked_history

if __name__ == '__main__':
    # We will use two identical images to simulate the script architecture
    # In reality, this would be image_001.jpg and image_002.jpg
    import glob
    cam_front_images = glob.glob("DataSet/samples/CAM_FRONT/*.jpg")
    
    if len(cam_front_images) >= 2:
        f1 = cam_front_images[0]
        f2 = cam_front_images[1] # Next sequential frame
        
        try:
            model, weights = load_perception_model()
            vru_data_for_ai = process_frame_sequence(f1, f2, model, weights)
            
            print("\n--- FINAL JSON PAYLOAD FOR TRANSFORMER MODEL ---")
            for person in vru_data_for_ai:
                print(f"Target: {person['type']}")
                print(f"Historical Trajectory [T-1, T0]: {person['history']}")
        except Exception as e:
            print("Model not loaded, but script structure is ready.")