stevenkhan commited on
Commit
098b42d
·
verified ·
1 Parent(s): 165a194

Upload clashcr/core/event_detector.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. clashcr/core/event_detector.py +237 -0
clashcr/core/event_detector.py ADDED
@@ -0,0 +1,237 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Event detector: find candidate card-play moments from temporal changes.
2
+
3
+ For normal live view:
4
+ - Temporal frame differencing around deployment moments
5
+ - Suppress continuous combat motion (persistent units)
6
+ - Require clear new spawn / effect event
7
+ - Separate opponent-side evidence from own-side evidence
8
+ - Save debug crops/masks for every candidate event
9
+ """
10
+ from __future__ import annotations
11
+
12
+ import logging
13
+ import time
14
+ from dataclasses import dataclass, field
15
+ from pathlib import Path
16
+ from typing import List, Optional, Tuple
17
+
18
+ import cv2
19
+ import numpy as np
20
+
21
+ logger = logging.getLogger(__name__)
22
+
23
+
24
+ @dataclass
25
+ class CandidateEvent:
26
+ timestamp: float
27
+ frame_idx: int
28
+ side: str # 'opponent', 'own', 'unknown'
29
+ bbox: Tuple[int, int, int, int] # x, y, w, h in full-frame coords
30
+ diff_mask: np.ndarray = field(repr=False)
31
+ crop: np.ndarray = field(repr=False)
32
+ reason: str = "" # why this event was triggered
33
+ suppressed: bool = False
34
+ suppression_reason: str = ""
35
+
36
+
37
+ class EventDetector:
38
+ """Detects deployment events using temporal differencing and motion suppression.
39
+
40
+ Pipeline:
41
+ 1. Convert frame to grayscale and downscale for speed.
42
+ 2. Compute absolute diff with previous frame.
43
+ 3. Threshold diff to get motion mask.
44
+ 4. Remove persistent motion (units already tracked for >N frames).
45
+ 5. Find connected components in residual mask.
46
+ 6. Classify components by side (opponent top, own bottom).
47
+ 7. Filter by size, shape, and temporal consistency.
48
+ 8. Return candidate events.
49
+ """
50
+
51
+ def __init__(self,
52
+ diff_threshold: int = 25,
53
+ min_event_area: int = 200,
54
+ max_event_area: int = 50000,
55
+ persistence_frames: int = 15,
56
+ opponent_y_ratio: float = 0.45,
57
+ own_y_ratio: float = 0.55,
58
+ temporal_confirmation: int = 2,
59
+ debug_dir: Optional[str] = None):
60
+ self.diff_threshold = diff_threshold
61
+ self.min_event_area = min_event_area
62
+ self.max_event_area = max_event_area
63
+ self.persistence_frames = persistence_frames
64
+ self.opponent_y_ratio = opponent_y_ratio
65
+ self.own_y_ratio = own_y_ratio
66
+ self.temporal_confirmation = temporal_confirmation
67
+ self.debug_dir = Path(debug_dir) if debug_dir else None
68
+
69
+ self._prev_gray: Optional[np.ndarray] = None
70
+ self._prev_mask: Optional[np.ndarray] = None
71
+ self._persistence_map: Optional[np.ndarray] = None
72
+ self._frame_idx = 0
73
+ self._pending_events: List[dict] = []
74
+
75
+ def _preprocess(self, frame: np.ndarray) -> np.ndarray:
76
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
77
+ # Downscale for speed (keep aspect)
78
+ h, w = gray.shape
79
+ if w > 640:
80
+ scale = 640 / w
81
+ gray = cv2.resize(gray, None, fx=scale, fy=scale)
82
+ return gray
83
+
84
+ def _update_persistence(self, motion_mask: np.ndarray) -> np.ndarray:
85
+ """Track how long motion has persisted in each pixel.
86
+ Returns mask of newly appearing motion."""
87
+ if self._persistence_map is None:
88
+ self._persistence_map = np.zeros_like(motion_mask, dtype=np.uint8)
89
+
90
+ # Increment where motion is active, decay where it is not
91
+ self._persistence_map = np.where(motion_mask > 0,
92
+ self._persistence_map + 1,
93
+ np.maximum(self._persistence_map - 2, 0))
94
+
95
+ # New motion: pixels that are active but have low persistence
96
+ new_motion = (motion_mask > 0) & (self._persistence_map <= self.persistence_frames)
97
+ return new_motion.astype(np.uint8) * 255
98
+
99
+ def _classify_side(self, cy: float, frame_h: int) -> str:
100
+ """Classify event side based on vertical position.
101
+
102
+ In standard portrait Clash Royale:
103
+ - Opponent side is top ~45% of arena
104
+ - Own side is bottom ~55% of arena
105
+ """
106
+ ratio = cy / frame_h
107
+ if ratio < self.opponent_y_ratio:
108
+ return "opponent"
109
+ elif ratio > self.own_y_ratio:
110
+ return "own"
111
+ return "unknown"
112
+
113
+ def process(self, frame: np.ndarray, battle_result) -> List[CandidateEvent]:
114
+ h, w = frame.shape[:2]
115
+ gray = self._preprocess(frame)
116
+ gh, gw = gray.shape
117
+
118
+ candidates: List[CandidateEvent] = []
119
+
120
+ if self._prev_gray is not None and self._prev_gray.shape == gray.shape:
121
+ diff = cv2.absdiff(gray, self._prev_gray)
122
+ _, motion_mask = cv2.threshold(diff, self.diff_threshold, 255, cv2.THRESH_BINARY)
123
+
124
+ # Morphological cleanup
125
+ kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
126
+ motion_mask = cv2.morphologyEx(motion_mask, cv2.MORPH_OPEN, kernel)
127
+ motion_mask = cv2.morphologyEx(motion_mask, cv2.MORPH_CLOSE, kernel)
128
+
129
+ # Remove persistent motion
130
+ new_motion = self._update_persistence(motion_mask)
131
+
132
+ # Scale mask back to original resolution for ROI extraction
133
+ if new_motion.shape != (h, w):
134
+ new_motion_full = cv2.resize(new_motion, (w, h), interpolation=cv2.INTER_NEAREST)
135
+ else:
136
+ new_motion_full = new_motion
137
+
138
+ # Find connected components
139
+ num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(new_motion_full, connectivity=8)
140
+
141
+ for i in range(1, num_labels):
142
+ x, y, cw, ch, area = stats[i]
143
+ if not (self.min_event_area <= area <= self.max_event_area):
144
+ continue
145
+
146
+ cx, cy = centroids[i]
147
+ side = self._classify_side(cy, h)
148
+
149
+ # Skip own-side placements (we only care about opponent)
150
+ if side == "own":
151
+ continue
152
+
153
+ # Extract crop and mask
154
+ pad = 20
155
+ x1 = max(0, x - pad)
156
+ y1 = max(0, y - pad)
157
+ x2 = min(w, x + cw + pad)
158
+ y2 = min(h, y + ch + pad)
159
+ crop = frame[y1:y2, x1:x2].copy()
160
+ mask = new_motion_full[y1:y2, x1:x2].copy()
161
+
162
+ event = CandidateEvent(
163
+ timestamp=time.monotonic(),
164
+ frame_idx=self._frame_idx,
165
+ side=side,
166
+ bbox=(x1, y1, x2 - x1, y2 - y1),
167
+ diff_mask=mask,
168
+ crop=crop,
169
+ reason=f"new_motion_area={area}_side={side}",
170
+ )
171
+ candidates.append(event)
172
+
173
+ self._prev_gray = gray
174
+ self._frame_idx += 1
175
+
176
+ # Temporal confirmation: require candidate to appear in consecutive frames
177
+ confirmed = self._temporal_confirm(candidates)
178
+
179
+ # Save debug crops
180
+ if self.debug_dir:
181
+ self.debug_dir.mkdir(parents=True, exist_ok=True)
182
+ for ev in confirmed:
183
+ if ev.suppressed:
184
+ continue
185
+ ts = int(ev.timestamp * 1000)
186
+ crop_path = self.debug_dir / f"event_{ts}_{ev.side}.jpg"
187
+ mask_path = self.debug_dir / f"event_{ts}_{ev.side}_mask.png"
188
+ cv2.imwrite(str(crop_path), ev.crop)
189
+ cv2.imwrite(str(mask_path), ev.diff_mask)
190
+
191
+ return confirmed
192
+
193
+ def _temporal_confirm(self, candidates: List[CandidateEvent]) -> List[CandidateEvent]:
194
+ """Require events to persist across multiple frames near the same location."""
195
+ # Simple implementation: store pending and match by IoU
196
+ new_pending = []
197
+ confirmed = []
198
+
199
+ for cand in candidates:
200
+ matched = False
201
+ for pending in self._pending_events:
202
+ if self._iou(cand.bbox, pending["bbox"]) > 0.3:
203
+ pending["frames"] += 1
204
+ pending["last"] = cand
205
+ matched = True
206
+ if pending["frames"] >= self.temporal_confirmation:
207
+ confirmed.append(cand)
208
+ break
209
+ if not matched:
210
+ new_pending.append({"bbox": cand.bbox, "frames": 1, "last": cand})
211
+
212
+ self._pending_events = new_pending + [p for p in self._pending_events if p["frames"] < self.temporal_confirmation]
213
+ return confirmed
214
+
215
+ @staticmethod
216
+ def _iou(a: Tuple[int, int, int, int], b: Tuple[int, int, int, int]) -> float:
217
+ ax, ay, aw, ah = a
218
+ bx, by, bw, bh = b
219
+ inter_x1 = max(ax, bx)
220
+ inter_y1 = max(ay, by)
221
+ inter_x2 = min(ax + aw, bx + bw)
222
+ inter_y2 = min(ay + ah, by + bh)
223
+ inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)
224
+ union_area = aw * ah + bw * bh - inter_area
225
+ return inter_area / union_area if union_area > 0 else 0.0
226
+
227
+ def reset(self) -> None:
228
+ self._prev_gray = None
229
+ self._prev_mask = None
230
+ self._persistence_map = None
231
+ self._frame_idx = 0
232
+ self._pending_events = []
233
+
234
+ def suppress_own_overlay(self, frame: np.ndarray, own_roi: Tuple[int, int, int, int]) -> None:
235
+ """Mask out own card overlay / hand card region to prevent false positives."""
236
+ # This is a hint; actual suppression happens in process() by side classification
237
+ pass