AIBRUH commited on
Commit
db72923
·
verified ·
1 Parent(s): ac8abe2

Upload folder using huggingface_hub

Browse files
Files changed (2) hide show
  1. watchdog/__init__.py +0 -0
  2. watchdog/main.py +194 -0
watchdog/__init__.py ADDED
File without changes
watchdog/main.py ADDED
@@ -0,0 +1,194 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """EDEN OS V2 — Elevated Watchdog Sidecar.
2
+
3
+ Runs every 3 seconds:
4
+ - Samples last 15 frames from shared queue
5
+ - Computes motion score (pixel variance) + expression variance
6
+ - Calls router's /evaluate-failover for agent-enhanced decision
7
+ - Falls back to consecutive_bad >= 2 rule if agent unavailable
8
+ """
9
+
10
+ import asyncio
11
+ import base64
12
+ import logging
13
+ import time
14
+ from pathlib import Path
15
+
16
+ import httpx
17
+ import numpy as np
18
+ from fastapi import FastAPI
19
+ from pydantic_settings import BaseSettings
20
+
21
+ logger = logging.getLogger("eden.watchdog")
22
+
23
+
24
+ class WatchdogSettings(BaseSettings):
25
+ router_url: str = "http://router:8100"
26
+ shared_dir: str = "/shared"
27
+ check_interval: float = 3.0
28
+ motion_threshold: float = 0.05
29
+ consecutive_fails_to_failover: int = 2
30
+ model_config = {"env_file": ".env", "extra": "ignore"}
31
+
32
+
33
+ cfg = WatchdogSettings()
34
+ app = FastAPI(title="EDEN Watchdog", version="2.0.0")
35
+
36
+ # State
37
+ consecutive_bad: dict[int, int] = {} # pipeline_id → consecutive bad count
38
+ last_check_time: float = 0.0
39
+ last_check_result: dict = {}
40
+ watchdog_running: bool = False
41
+
42
+
43
+ def _read_recent_frames(frame_dir: Path, n: int = 15) -> list[bytes]:
44
+ """Read the N most recent frame files from the shared frame directory."""
45
+ if not frame_dir.exists():
46
+ return []
47
+ files = sorted(frame_dir.glob("*.jpg"), key=lambda f: f.stat().st_mtime, reverse=True)[:n]
48
+ frames = []
49
+ for f in files:
50
+ try:
51
+ frames.append(f.read_bytes())
52
+ except Exception:
53
+ pass
54
+ return list(reversed(frames))
55
+
56
+
57
+ def compute_motion_score(frames: list[bytes]) -> float:
58
+ """Compute motion score from raw frame bytes."""
59
+ if len(frames) < 2:
60
+ return 0.0
61
+ try:
62
+ arrays = [np.frombuffer(f, dtype=np.uint8) for f in frames]
63
+ diffs = []
64
+ for i in range(1, len(arrays)):
65
+ min_len = min(len(arrays[i - 1]), len(arrays[i]))
66
+ diff = np.mean(np.abs(arrays[i][:min_len].astype(float) - arrays[i - 1][:min_len].astype(float)))
67
+ diffs.append(diff)
68
+ avg_diff = np.mean(diffs) if diffs else 0.0
69
+ return min(1.0, float(avg_diff / 25.0))
70
+ except Exception as e:
71
+ logger.warning(f"Motion score error: {e}")
72
+ return 0.5
73
+
74
+
75
+ async def _evaluate_failover_with_agent(pipeline_id: int, motion_score: float, consecutive: int, frame_count: int) -> bool:
76
+ """Ask router's agent-enhanced endpoint whether to failover.
77
+ Falls back to consecutive_bad >= threshold if call fails.
78
+ """
79
+ try:
80
+ async with httpx.AsyncClient(timeout=3.0) as client:
81
+ resp = await client.post(
82
+ f"{cfg.router_url}/evaluate-failover",
83
+ json={
84
+ "pipeline_id": pipeline_id,
85
+ "motion_score": motion_score,
86
+ "consecutive_bad": consecutive,
87
+ "frame_count": frame_count,
88
+ },
89
+ )
90
+ if resp.status_code == 200:
91
+ data = resp.json()
92
+ decision = data.get("should_failover", False)
93
+ logger.info(f"Agent failover decision for P{pipeline_id}: {decision}")
94
+ return decision
95
+ except Exception as e:
96
+ logger.warning(f"Agent failover eval failed, using threshold fallback: {e}")
97
+
98
+ # Fallback: original rule
99
+ return consecutive >= cfg.consecutive_fails_to_failover
100
+
101
+
102
+ async def _trigger_failover(pipeline_id: int):
103
+ """Tell the Router to mark a pipeline as failed and swap to next."""
104
+ try:
105
+ async with httpx.AsyncClient(timeout=5.0) as client:
106
+ resp = await client.post(
107
+ f"{cfg.router_url}/failover",
108
+ json={"pipeline_id": pipeline_id},
109
+ )
110
+ logger.warning(f"Failover triggered for pipeline {pipeline_id}: {resp.status_code}")
111
+ except Exception as e:
112
+ logger.error(f"Failover trigger failed: {e}")
113
+
114
+
115
+ async def _check_loop():
116
+ """Main watchdog loop — runs every 3 seconds."""
117
+ global last_check_time, last_check_result, watchdog_running
118
+ watchdog_running = True
119
+ frame_dir = Path(cfg.shared_dir) / "frames"
120
+
121
+ logger.info(f"Watchdog loop started (interval={cfg.check_interval}s, threshold={cfg.motion_threshold})")
122
+
123
+ while watchdog_running:
124
+ try:
125
+ frames = _read_recent_frames(frame_dir)
126
+ if len(frames) < 2:
127
+ await asyncio.sleep(cfg.check_interval)
128
+ continue
129
+
130
+ motion = compute_motion_score(frames)
131
+ last_check_time = time.time()
132
+
133
+ # Get current active pipeline from router
134
+ try:
135
+ async with httpx.AsyncClient(timeout=3.0) as client:
136
+ resp = await client.get(f"{cfg.router_url}/status")
137
+ status = resp.json()
138
+ active_pipelines = [
139
+ p for p in status.get("pipelines", [])
140
+ if p["status"] in ("ready", "busy")
141
+ ]
142
+ except Exception:
143
+ active_pipelines = []
144
+
145
+ last_check_result = {
146
+ "motion_score": round(motion, 3),
147
+ "frame_count": len(frames),
148
+ "is_healthy": motion >= cfg.motion_threshold,
149
+ "active_pipelines": len(active_pipelines),
150
+ "timestamp": last_check_time,
151
+ }
152
+
153
+ if motion < cfg.motion_threshold:
154
+ # Bad check — ask router's agent-enhanced failover decision
155
+ for p in active_pipelines:
156
+ pid = p["id"]
157
+ consecutive_bad[pid] = consecutive_bad.get(pid, 0) + 1
158
+ logger.warning(
159
+ f"Bad check #{consecutive_bad[pid]} for pipeline {p['name']} "
160
+ f"(motion={motion:.3f} < {cfg.motion_threshold})"
161
+ )
162
+
163
+ # Ask agent-enhanced router for failover decision
164
+ should_failover = await _evaluate_failover_with_agent(
165
+ pid, motion, consecutive_bad[pid], len(frames)
166
+ )
167
+ if should_failover:
168
+ await _trigger_failover(pid)
169
+ consecutive_bad[pid] = 0
170
+ else:
171
+ # Good check — reset counters
172
+ for pid in list(consecutive_bad.keys()):
173
+ consecutive_bad[pid] = 0
174
+
175
+ except Exception as e:
176
+ logger.error(f"Watchdog check error: {e}")
177
+
178
+ await asyncio.sleep(cfg.check_interval)
179
+
180
+
181
+ @app.get("/health")
182
+ async def health():
183
+ return {
184
+ "status": "ok",
185
+ "watchdog_running": watchdog_running,
186
+ "last_check": last_check_result,
187
+ "consecutive_bad": consecutive_bad,
188
+ }
189
+
190
+
191
+ @app.on_event("startup")
192
+ async def startup():
193
+ logger.info("EDEN Watchdog starting...")
194
+ asyncio.create_task(_check_loop())