File size: 3,138 Bytes
3a66575
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import json

import cv2
import numpy as np
import torch

from motion_estimator import GlobalMotionEstimator
from report_generator import ReportGenerator
from tracker import Tracker


GREEN = "\033[92m"
RED = "\033[91m"
RESET = "\033[0m"


def run_test(name, fn):
    try:
        fn()
        print(f"{GREEN}PASS{RESET} {name}")
        return True
    except Exception as exc:
        print(f"{RED}FAIL{RESET} {name}: {exc}")
        return False


def test_model_loading():
    from models import build_model

    class Args:
        backbone = "vgg16_bn"
        row = 2
        line = 2

    model = build_model(Args())
    assert model is not None


def test_dummy_inference():
    from models import build_model

    class Args:
        backbone = "vgg16_bn"
        row = 2
        line = 2

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = build_model(Args()).to(device).eval()
    x = torch.zeros((1, 3, 512, 512), dtype=torch.float32, device=device)
    with torch.inference_mode():
        out = model(x)
    assert "pred_logits" in out and "pred_points" in out
    assert out["pred_logits"].shape[0] == 1


def test_tracker_persistence():
    tracker = Tracker(max_distance=20, max_age=3)
    frame = np.zeros((128, 128, 3), dtype=np.uint8)
    first_id = None
    for i in range(30):
        tracks, total, anomaly = tracker.update(frame, [[50 + i * 0.2, 50 + i * 0.1]])
        assert tracks
        if first_id is None:
            first_id = tracks[0].id
        assert tracks[0].id == first_id
        assert anomaly is False
    assert total == 1


def test_motion_compensation():
    estimator = GlobalMotionEstimator()
    img1 = np.zeros((200, 200, 3), dtype=np.uint8)
    cv2.circle(img1, (80, 80), 10, (255, 255, 255), -1)
    cv2.rectangle(img1, (120, 120), (150, 150), (255, 255, 255), -1)
    img2 = np.roll(img1, shift=5, axis=1)
    t1 = estimator.update(img1)
    t2 = estimator.update(img2)
    assert t1.shape == (2, 3)
    assert t2.shape == (2, 3)


def test_alert_thresholds():
    for current, threshold in [(50, 100), (75, 100), (90, 100), (100, 100)]:
        ratio = current / threshold
        assert ratio >= 0


def test_report_exports():
    report = ReportGenerator()
    report.add_frame_data(1, 0.03, 10, 11)
    csv_data = report.get_csv()
    json_data = report.get_json()
    assert b"frame_number" in csv_data
    parsed = json.loads(json_data.decode("utf-8"))
    assert parsed["timeline"][0]["frame_count"] == 10


def main():
    tests = [
        ("model loading", test_model_loading),
        ("dummy image inference", test_dummy_inference),
        ("tracker ID persistence across 30 frames", test_tracker_persistence),
        ("motion compensation transform", test_motion_compensation),
        ("alert thresholds at 50/75/90/100%", test_alert_thresholds),
        ("CSV/JSON export", test_report_exports),
    ]
    results = [run_test(name, fn) for name, fn in tests]
    passed = sum(results)
    print(f"{passed}/{len(results)} tests passed")
    raise SystemExit(0 if passed == len(results) else 1)


if __name__ == "__main__":
    main()