e1250 commited on
Commit
efc36ce
·
1 Parent(s): 3d237cb
api/dependencies.py CHANGED
@@ -1,14 +1,18 @@
1
  # Here exists function to use instead of using app.state directly in the main.py
2
  from fastapi.requests import HTTPConnection
3
 
 
4
  def get_detection_model(request: HTTPConnection):
5
  return request.app.state.detection_model
6
 
 
7
  def get_depth_model(request: HTTPConnection):
8
  return request.app.state.depth_model
9
 
 
10
  def get_safety_detection_model(request: HTTPConnection):
11
  return request.app.state.safety_detection_model
12
 
 
13
  def get_redis(request: HTTPConnection):
14
- return request.app.state.redis
 
1
  # Here exists function to use instead of using app.state directly in the main.py
2
  from fastapi.requests import HTTPConnection
3
 
4
+
5
  def get_detection_model(request: HTTPConnection):
6
  return request.app.state.detection_model
7
 
8
+
9
  def get_depth_model(request: HTTPConnection):
10
  return request.app.state.depth_model
11
 
12
+
13
  def get_safety_detection_model(request: HTTPConnection):
14
  return request.app.state.safety_detection_model
15
 
16
+
17
  def get_redis(request: HTTPConnection):
18
+ return request.app.state.redis
api/routers/camera_stream.py CHANGED
@@ -1,13 +1,17 @@
 
1
  from api.dependencies import get_safety_detection_model
2
  from api.dependencies import get_detection_model, get_depth_model
3
  import asyncio
4
  import itertools
5
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
6
- from pandas.core.frame import nested_data_to_arrays
7
- from ai.contracts.detector import DetectionResults
8
- from api.routers.metrics import active_cameras, decode_duration_seconds, depth_duration_seconds, detection_duration_seconds, frame_processing_duration_seconds
 
 
 
 
9
  from contracts.camera_metadata import CameraMetadata, DetectionMetadata
10
- import traceback
11
  import mlflow
12
  from utils.experiment import log_config
13
 
@@ -17,20 +21,21 @@ import time
17
 
18
  router = APIRouter()
19
 
 
20
  @router.websocket("/stream/{camera_id}")
21
  async def websocket_detect(
22
- websocket: WebSocket,
23
- camera_id:str,
24
  detector=Depends(get_detection_model),
25
  safety_detector=Depends(get_safety_detection_model),
26
- depth_model=Depends(get_depth_model)
27
- ):
28
  """
29
- WebSocket stream takes the frame pass it to the ai models, save it under the camera id provided in the url.
30
-
31
  url here is: ws://127.0.0.1:8000/detectors/stream/camera_id
32
  """
33
- # Yes, I asked the same questions, is using webscoket.app.state many times here is consuming. after checking, it is not performance consuming.
34
  state = websocket.app.state
35
  logger = state.logger
36
  # Using Depends is important and called Inversion Of Control (IoC)/ Dependency injection, and is important for testing.
@@ -39,14 +44,16 @@ async def websocket_detect(
39
  # Accepting the connection from the client
40
  await websocket.accept()
41
 
42
- # Logging and tracking action
43
  active_cameras.inc()
44
- await redis.sadd("cameras:active", camera_id) # Save connected camera name into redis
 
 
45
  logger.info(f"Client ID >>{camera_id}<< Connected...")
46
 
47
  step_counter = itertools.count()
48
 
49
- loop = asyncio.get_running_loop()
50
  # Queue removing old images in case they were being stacked
51
  frame_queue: asyncio.Queue = asyncio.Queue(maxsize=1)
52
 
@@ -62,83 +69,119 @@ async def websocket_detect(
62
  logger.debug("Frame Dropped", camera_id=camera_id)
63
  except asyncio.QueueEmpty:
64
  pass
65
-
66
  await frame_queue.put(frame_bytes)
67
  except WebSocketDisconnect:
68
  raise
69
-
70
  async def process_frames():
71
  try:
72
-
73
  logger.info(f"Camera {camera_id} start sending frames...")
74
 
75
- def decode_frame(fb): return cv.imdecode(np.frombuffer(fb, np.uint8), cv.IMREAD_COLOR)
 
76
 
77
- # Keep receiving messages in a loop until disconnection.
78
  while True:
79
  frame_bytes = await frame_queue.get()
80
-
81
  # Profiling
82
- t0 = time.time()
83
- image_array = await loop.run_in_executor(None, decode_frame, frame_bytes)
84
- decode_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
85
- mlflow.log_metric("frame_processing_time", round(time.time() - t0, 3), next(step_counter))
 
 
 
 
 
 
 
 
86
 
87
  # Apply detection models
88
  t0 = time.time()
89
- detection_task = loop.run_in_executor(None, detector.detect, image_array)
90
- safety_task = loop.run_in_executor(None, safety_detector.detect, image_array)
91
- detections, safety_detection = await asyncio.gather(detection_task, safety_task)
92
- detection_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
93
- mlflow.log_metric("detection_duration_seconds", round(time.time() - t0, 3), next(step_counter))
 
 
 
 
 
 
 
 
 
 
 
 
94
 
95
  # Profiling
96
- frame_processing_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
 
 
97
  logger.debug("Frame processed", camera_id=camera_id)
98
- mlflow.log_metric("frame_processing duration time", round(time.time() - t0, 3), next(step_counter))
99
-
100
- boxes_center = []
101
- boxes_center_ratio = []
102
- for box in detections.detections:
103
- print(type(box))
104
- xmin, ymin, xmax, ymax = box.xyxy
105
- xcenter = (xmax + xmin) / 2
106
- ycenter = (ymax + ymin) / 2
107
- boxes_center.append((int(xcenter), int(ycenter)))
108
- boxes_center_ratio.append(xcenter / image_array.shape[1])
109
-
110
  t0 = time.time()
111
- depth_points = await loop.run_in_executor(None, depth_model.calculate_depth, image_array, boxes_center) if boxes_center else []
112
- depth_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
113
- mlflow.log_metric("depth_duration_seconds", round(time.time() - t0, 3), next(step_counter))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
 
115
- detection_metadata = [DetectionMetadata(depth=depth, xRatio=xRatio) for depth, xRatio in zip(depth_points, boxes_center_ratio)]
116
- metadata = CameraMetadata(camera_id=camera_id, is_danger = True if safety_detection else False, detection_metadata=detection_metadata)
117
-
118
  await redis.publish("dashboard_stream", metadata.model_dump_json())
119
  # Even if the camera was disconnected, redis is still going to show its data, which is not accurate.
120
  # Instead, we set expiry date for the camera data.
121
  await redis.setex(
122
- f"camera:{camera_id}:latest", # And this is the key, or tag
123
- 10, # in seconds
124
- metadata.model_dump_json()
125
  )
126
 
127
  # Note that JSONResponse doesn't work here, as it is for HTTP
128
  await websocket.send_json({"status": 200, "camera_id": camera_id})
129
-
130
  except Exception as e:
131
  logger.error(f"Processing Error: {e}", camera_id=camera_id)
132
  raise
133
-
134
- with mlflow.start_run(run_name=f'camera_{camera_id}', nested=True, parent_run_id=state.mlflow_run_id):
 
 
135
  log_config()
136
 
137
  try:
138
- await asyncio.gather(
139
- receive_frames(),
140
- process_frames()
141
- )
142
 
143
  except WebSocketDisconnect:
144
  logger.warn(f"Client ID >>{camera_id}<< Disconnected Normally...")
@@ -146,10 +189,12 @@ async def websocket_detect(
146
  except Exception as e:
147
  logger.error(f"Error in websocker, Client ID: >>{camera_id}<<: {e}")
148
  logger.exception(e)
149
- # This one is actually really better, it shows more details about the issue happened.
150
  # Also work on and create the logger.exception, as it directly controls printing more details about the issue happened.
151
  await websocket.close()
152
 
153
  finally:
154
- await redis.srem("cameras:active", camera_id) # Remove the camera from redis connected cameras
155
- active_cameras.dec()
 
 
 
1
+ from backend.domain.detection_box_center import calculate_detection_box_center
2
  from api.dependencies import get_safety_detection_model
3
  from api.dependencies import get_detection_model, get_depth_model
4
  import asyncio
5
  import itertools
6
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
7
+ from api.routers.metrics import (
8
+ active_cameras,
9
+ decode_duration_seconds,
10
+ depth_duration_seconds,
11
+ detection_duration_seconds,
12
+ frame_processing_duration_seconds,
13
+ )
14
  from contracts.camera_metadata import CameraMetadata, DetectionMetadata
 
15
  import mlflow
16
  from utils.experiment import log_config
17
 
 
21
 
22
  router = APIRouter()
23
 
24
+
25
  @router.websocket("/stream/{camera_id}")
26
  async def websocket_detect(
27
+ websocket: WebSocket,
28
+ camera_id: str,
29
  detector=Depends(get_detection_model),
30
  safety_detector=Depends(get_safety_detection_model),
31
+ depth_model=Depends(get_depth_model),
32
+ ):
33
  """
34
+ WebSocket stream takes the frame pass it to the ai models, save it under the camera id provided in the url.
35
+
36
  url here is: ws://127.0.0.1:8000/detectors/stream/camera_id
37
  """
38
+ # Yes, I asked the same questions, is using webscoket.app.state many times here is consuming. after checking, it is not performance consuming.
39
  state = websocket.app.state
40
  logger = state.logger
41
  # Using Depends is important and called Inversion Of Control (IoC)/ Dependency injection, and is important for testing.
 
44
  # Accepting the connection from the client
45
  await websocket.accept()
46
 
47
+ # Logging and tracking action
48
  active_cameras.inc()
49
+ await redis.sadd(
50
+ "cameras:active", camera_id
51
+ ) # Save connected camera name into redis
52
  logger.info(f"Client ID >>{camera_id}<< Connected...")
53
 
54
  step_counter = itertools.count()
55
 
56
+ loop = asyncio.get_running_loop()
57
  # Queue removing old images in case they were being stacked
58
  frame_queue: asyncio.Queue = asyncio.Queue(maxsize=1)
59
 
 
69
  logger.debug("Frame Dropped", camera_id=camera_id)
70
  except asyncio.QueueEmpty:
71
  pass
72
+
73
  await frame_queue.put(frame_bytes)
74
  except WebSocketDisconnect:
75
  raise
76
+
77
  async def process_frames():
78
  try:
 
79
  logger.info(f"Camera {camera_id} start sending frames...")
80
 
81
+ def decode_frame(fb):
82
+ return cv.imdecode(np.frombuffer(fb, np.uint8), cv.IMREAD_COLOR)
83
 
84
+ # Keep receiving messages in a loop until disconnection.
85
  while True:
86
  frame_bytes = await frame_queue.get()
87
+
88
  # Profiling
89
+ t0 = time.time()
90
+ image_array = await loop.run_in_executor(
91
+ None, decode_frame, frame_bytes
92
+ )
93
+ decode_duration_seconds.labels(camera_id).observe(
94
+ round(time.time() - t0, 3)
95
+ )
96
+ mlflow.log_metric(
97
+ "frame_processing_time",
98
+ round(time.time() - t0, 3),
99
+ next(step_counter),
100
+ )
101
 
102
  # Apply detection models
103
  t0 = time.time()
104
+ detection_task = loop.run_in_executor(
105
+ None, detector.detect, image_array
106
+ )
107
+ safety_task = loop.run_in_executor(
108
+ None, safety_detector.detect, image_array
109
+ )
110
+ detections, safety_detection = await asyncio.gather(
111
+ detection_task, safety_task
112
+ )
113
+ detection_duration_seconds.labels(camera_id).observe(
114
+ round(time.time() - t0, 3)
115
+ )
116
+ mlflow.log_metric(
117
+ "detection_duration_seconds",
118
+ round(time.time() - t0, 3),
119
+ next(step_counter),
120
+ )
121
 
122
  # Profiling
123
+ frame_processing_duration_seconds.labels(camera_id).observe(
124
+ round(time.time() - t0, 3)
125
+ )
126
  logger.debug("Frame processed", camera_id=camera_id)
127
+ mlflow.log_metric(
128
+ "frame_processing duration time",
129
+ round(time.time() - t0, 3),
130
+ next(step_counter),
131
+ )
132
+
133
+ boxes_center, boxes_center_ratio = calculate_detection_box_center(detections.detections, image_array.shape[1])
134
+
 
 
 
 
135
  t0 = time.time()
136
+ depth_points = (
137
+ await loop.run_in_executor(
138
+ None, depth_model.calculate_depth, image_array, boxes_center
139
+ )
140
+ if boxes_center
141
+ else []
142
+ )
143
+ depth_duration_seconds.labels(camera_id).observe(
144
+ round(time.time() - t0, 3)
145
+ )
146
+ mlflow.log_metric(
147
+ "depth_duration_seconds",
148
+ round(time.time() - t0, 3),
149
+ next(step_counter),
150
+ )
151
+
152
+ detection_metadata = [
153
+ DetectionMetadata(depth=depth, xRatio=xRatio)
154
+ for depth, xRatio in zip(depth_points, boxes_center_ratio)
155
+ ]
156
+ metadata = CameraMetadata(
157
+ camera_id=camera_id,
158
+ is_danger=True if safety_detection else False,
159
+ detection_metadata=detection_metadata,
160
+ )
161
 
 
 
 
162
  await redis.publish("dashboard_stream", metadata.model_dump_json())
163
  # Even if the camera was disconnected, redis is still going to show its data, which is not accurate.
164
  # Instead, we set expiry date for the camera data.
165
  await redis.setex(
166
+ f"camera:{camera_id}:latest", # And this is the key, or tag
167
+ 10, # in seconds
168
+ metadata.model_dump_json(),
169
  )
170
 
171
  # Note that JSONResponse doesn't work here, as it is for HTTP
172
  await websocket.send_json({"status": 200, "camera_id": camera_id})
173
+
174
  except Exception as e:
175
  logger.error(f"Processing Error: {e}", camera_id=camera_id)
176
  raise
177
+
178
+ with mlflow.start_run(
179
+ run_name=f"camera_{camera_id}", nested=True, parent_run_id=state.mlflow_run_id
180
+ ):
181
  log_config()
182
 
183
  try:
184
+ await asyncio.gather(receive_frames(), process_frames())
 
 
 
185
 
186
  except WebSocketDisconnect:
187
  logger.warn(f"Client ID >>{camera_id}<< Disconnected Normally...")
 
189
  except Exception as e:
190
  logger.error(f"Error in websocker, Client ID: >>{camera_id}<<: {e}")
191
  logger.exception(e)
192
+ # This one is actually really better, it shows more details about the issue happened.
193
  # Also work on and create the logger.exception, as it directly controls printing more details about the issue happened.
194
  await websocket.close()
195
 
196
  finally:
197
+ await redis.srem(
198
+ "cameras:active", camera_id
199
+ ) # Remove the camera from redis connected cameras
200
+ active_cameras.dec()
api/routers/dashboard_stream.py CHANGED
@@ -1,15 +1,14 @@
1
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect
2
- from api.routers.metrics import active_dashboards
3
  import asyncio
4
- import traceback
5
- import redis.asyncio as aioredis
6
 
7
  router = APIRouter()
8
 
 
9
  @router.websocket("/stream")
10
  async def dashboard_websocket(websocket: WebSocket):
11
  """
12
- WebScoket sending updates to the dashboard.
13
 
14
  url: ws://127.0.0.1:8000/dashboard/stream
15
  """
@@ -17,10 +16,10 @@ async def dashboard_websocket(websocket: WebSocket):
17
  logger = state.logger
18
  redis = state.redis
19
 
20
- # Accept the client connection.
21
  await websocket.accept()
22
 
23
- # Logging and tracking
24
  active_dashboards.inc()
25
  logger.info("Dashboard Connected...")
26
 
@@ -28,7 +27,6 @@ async def dashboard_websocket(websocket: WebSocket):
28
  await pubsub.subscribe("dashboard_stream")
29
 
30
  try:
31
-
32
  while True:
33
  message = await pubsub.get_message(ignore_subscribe_messages=True)
34
 
@@ -36,8 +34,7 @@ async def dashboard_websocket(websocket: WebSocket):
36
  logger.debug("Sending updates to Dashboard...")
37
  await websocket.send_text(message["data"])
38
 
39
- await asyncio.sleep(0.01) # giving time to detect server disconnection.
40
-
41
 
42
  except WebSocketDisconnect:
43
  logger.warn("Dashboard Disconnected Normally...")
@@ -49,4 +46,4 @@ async def dashboard_websocket(websocket: WebSocket):
49
  finally:
50
  active_dashboards.dec()
51
  await pubsub.unsubscribe("dashboard_stream")
52
- await pubsub.close()
 
1
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect
2
+ from api.routers.metrics import active_dashboards
3
  import asyncio
 
 
4
 
5
  router = APIRouter()
6
 
7
+
8
  @router.websocket("/stream")
9
  async def dashboard_websocket(websocket: WebSocket):
10
  """
11
+ WebScoket sending updates to the dashboard.
12
 
13
  url: ws://127.0.0.1:8000/dashboard/stream
14
  """
 
16
  logger = state.logger
17
  redis = state.redis
18
 
19
+ # Accept the client connection.
20
  await websocket.accept()
21
 
22
+ # Logging and tracking
23
  active_dashboards.inc()
24
  logger.info("Dashboard Connected...")
25
 
 
27
  await pubsub.subscribe("dashboard_stream")
28
 
29
  try:
 
30
  while True:
31
  message = await pubsub.get_message(ignore_subscribe_messages=True)
32
 
 
34
  logger.debug("Sending updates to Dashboard...")
35
  await websocket.send_text(message["data"])
36
 
37
+ await asyncio.sleep(0.01) # giving time to detect server disconnection.
 
38
 
39
  except WebSocketDisconnect:
40
  logger.warn("Dashboard Disconnected Normally...")
 
46
  finally:
47
  active_dashboards.dec()
48
  await pubsub.unsubscribe("dashboard_stream")
49
+ await pubsub.close()
api/routers/health.py CHANGED
@@ -1,6 +1,6 @@
1
- # Very simple and important file, uesd to check the api health, if it return 200 everything is great, otherwise, there is an issue.
2
- # This file is being used mostly in HTTP and not websockets.
3
- # Health check is being used for example by docker, to check is dependencies are working fine, if not, he might restart.
4
 
5
  from requests import Request
6
  from http import HTTPStatus
@@ -11,12 +11,13 @@ from api.routers.metrics import active_cameras
11
 
12
  router = APIRouter()
13
 
 
14
  @router.get("/")
15
  @router.get("/live")
16
  async def live_check(response: Response):
17
  """
18
- Prove that the process is running, No logic requried here.
19
- Confirming that the server is not dead.
20
  It is fails, container killed and restarted..
21
  Has to be very cheap.
22
  """
@@ -25,14 +26,15 @@ async def live_check(response: Response):
25
  return {
26
  "status": "live",
27
  "active_cameras": active_cameras._value.get(),
28
- "timestamp": datetime.now().isoformat()
29
- }
 
30
 
31
  @router.get("/ready")
32
  async def ready_check(response: Response, request: Request):
33
  """
34
- Checck if parts work here, ex. are data readable.
35
- Are data readable here.
36
  Also can this instance accept traffic right now, or send them to another healthy instance.
37
  """
38
 
@@ -43,7 +45,7 @@ async def ready_check(response: Response, request: Request):
43
  try:
44
  await request.app.state.redis.ping()
45
  checks["redis"] = "Good"
46
- except Exception as e:
47
  checks["redis"] = "unreachable"
48
  healthy = False
49
 
@@ -55,13 +57,15 @@ async def ready_check(response: Response, request: Request):
55
  checks["detection_model"] = "can't load"
56
  healthy = False
57
 
58
- checks["active_cameras"] = list(await request.app.state.redis.smembers("cameras:active"))
 
 
59
 
60
  response.status_code = HTTPStatus.OK if healthy else HTTPStatus.SERVICE_UNAVAILABLE
61
 
62
  return {
63
  "status": "ready" if healthy else "degraded",
64
  "checks": checks,
65
- "timestamp": datetime.now().isoformat(), # Sending the time also is a good practise
66
  "version": "1.0.0",
67
- }
 
1
+ # Very simple and important file, uesd to check the api health, if it return 200 everything is great, otherwise, there is an issue.
2
+ # This file is being used mostly in HTTP and not websockets.
3
+ # Health check is being used for example by docker, to check is dependencies are working fine, if not, he might restart.
4
 
5
  from requests import Request
6
  from http import HTTPStatus
 
11
 
12
  router = APIRouter()
13
 
14
+
15
  @router.get("/")
16
  @router.get("/live")
17
  async def live_check(response: Response):
18
  """
19
+ Prove that the process is running, No logic requried here.
20
+ Confirming that the server is not dead.
21
  It is fails, container killed and restarted..
22
  Has to be very cheap.
23
  """
 
26
  return {
27
  "status": "live",
28
  "active_cameras": active_cameras._value.get(),
29
+ "timestamp": datetime.now().isoformat(),
30
+ }
31
+
32
 
33
  @router.get("/ready")
34
  async def ready_check(response: Response, request: Request):
35
  """
36
+ Checck if parts work here, ex. are data readable.
37
+ Are data readable here.
38
  Also can this instance accept traffic right now, or send them to another healthy instance.
39
  """
40
 
 
45
  try:
46
  await request.app.state.redis.ping()
47
  checks["redis"] = "Good"
48
+ except Exception:
49
  checks["redis"] = "unreachable"
50
  healthy = False
51
 
 
57
  checks["detection_model"] = "can't load"
58
  healthy = False
59
 
60
+ checks["active_cameras"] = list(
61
+ await request.app.state.redis.smembers("cameras:active")
62
+ )
63
 
64
  response.status_code = HTTPStatus.OK if healthy else HTTPStatus.SERVICE_UNAVAILABLE
65
 
66
  return {
67
  "status": "ready" if healthy else "degraded",
68
  "checks": checks,
69
+ "timestamp": datetime.now().isoformat(), # Sending the time also is a good practise
70
  "version": "1.0.0",
71
+ }
api/routers/metrics.py CHANGED
@@ -1,45 +1,35 @@
1
- # Prometheus is for real-time system health.
2
  # Grafana visualize the output of Prometheus
3
  # This is considered as Monitoring
4
- from prometheus_client import Counter, Histogram, Gauge, make_asgi_app
5
 
6
  metrics_asgi_app = make_asgi_app()
7
 
8
 
9
  active_cameras = Gauge(
10
- "active_camera_connections",
11
- "Number of Currently Connected camera websockets"
12
  )
13
 
14
  active_dashboards = Gauge(
15
- "active_dashboards",
16
- "Number of active dashboards which fetching data"
17
  )
18
 
19
  frame_processing_duration_seconds = Histogram(
20
- "frame_processing_duration_seconds",
21
- "Time to process one frame",
22
- ["camera_id"]
23
  )
24
 
25
  decode_duration_seconds = Histogram(
26
- "decode_duration_seconds",
27
- "Time to decode one image",
28
- ["camera_id"]
29
  )
30
  detection_duration_seconds = Histogram(
31
- "detection_duration_seconds",
32
- "Time to detect",
33
- ["camera_id"]
34
  )
35
  depth_duration_seconds = Histogram(
36
- "depth_duration_seconds",
37
- "Time to calculate the depth",
38
- ["camera_id"]
39
  )
40
 
41
 
42
  cpu_usage = Gauge("cpu_usage_percent", "CPU usage %")
43
  mem_usage = Gauge("mem_usage_percent", "mem usage %")
44
 
45
- active_workers = Gauge("active_workers", "Active threads")
 
1
+ # Prometheus is for real-time system health.
2
  # Grafana visualize the output of Prometheus
3
  # This is considered as Monitoring
4
+ from prometheus_client import Histogram, Gauge, make_asgi_app
5
 
6
  metrics_asgi_app = make_asgi_app()
7
 
8
 
9
  active_cameras = Gauge(
10
+ "active_camera_connections", "Number of Currently Connected camera websockets"
 
11
  )
12
 
13
  active_dashboards = Gauge(
14
+ "active_dashboards", "Number of active dashboards which fetching data"
 
15
  )
16
 
17
  frame_processing_duration_seconds = Histogram(
18
+ "frame_processing_duration_seconds", "Time to process one frame", ["camera_id"]
 
 
19
  )
20
 
21
  decode_duration_seconds = Histogram(
22
+ "decode_duration_seconds", "Time to decode one image", ["camera_id"]
 
 
23
  )
24
  detection_duration_seconds = Histogram(
25
+ "detection_duration_seconds", "Time to detect", ["camera_id"]
 
 
26
  )
27
  depth_duration_seconds = Histogram(
28
+ "depth_duration_seconds", "Time to calculate the depth", ["camera_id"]
 
 
29
  )
30
 
31
 
32
  cpu_usage = Gauge("cpu_usage_percent", "CPU usage %")
33
  mem_usage = Gauge("mem_usage_percent", "mem usage %")
34
 
35
+ active_workers = Gauge("active_workers", "Active threads")
config/settings.py CHANGED
@@ -1,9 +1,17 @@
1
  from pathlib import Path
2
  from typing import Literal, List
3
  from pydantic import BaseModel
4
- from pydantic_settings import BaseSettings, DotEnvSettingsSource, EnvSettingsSource, SettingsConfigDict, PydanticBaseSettingsSource, YamlConfigSettingsSource
 
 
 
 
 
 
 
5
  import yaml
6
 
 
7
  def join_tag(loader, node):
8
  """
9
  Help joining pathes in config.YAML directly.
@@ -12,16 +20,20 @@ def join_tag(loader, node):
12
  path = Path(*(str(part) for part in parts)).resolve()
13
  return str(path)
14
 
15
- # It didn't work before, After some research, .SafeLoaded is unmentioned must for my case.
 
16
  yaml.SafeLoader.add_constructor("!join", join_tag)
17
 
 
18
  class IntervalsConfig(BaseModel):
19
  system_metrics_seconds: float
20
  frames_summary_every: int
21
  realtime_updates_every: float
22
 
 
23
  class YoloConfig(BaseModel):
24
  """Contains yolo configurations"""
 
25
  model_name: str
26
  classes: List[str]
27
  batch_size: int
@@ -30,13 +42,17 @@ class YoloConfig(BaseModel):
30
  augment: bool
31
  data_path: str
32
 
 
33
  class SecurityDetector(BaseModel):
34
  "Contains Security Detectors like Smoke - Fire"
 
35
  model_name: str
36
  classes: List[str]
37
 
 
38
  class DepthConfig(BaseModel):
39
  "Contains depths estimation configurations"
 
40
  model_name: str
41
  device: Literal["cuda", "cpu"]
42
  encoder: Literal["vits", "vitb", "vitl", "vitg"]
@@ -49,45 +65,47 @@ class AppConfig(BaseSettings):
49
  - Override values with .env
50
  """
51
 
52
- # Note that it doesn't show error, Take care.
53
  model_config = SettingsConfigDict(
54
  env_file=Path(__file__).parent / ".env",
55
  env_file_encoding="utf-8",
56
  yaml_file=Path(__file__).parent / "config.yaml",
57
- extra="ignore" # Ignore other settings in yaml and env as they are not mentioedhere
58
  )
59
 
60
- project_name:str
61
- project_desc:str
62
  task: Literal["indoor", "outdoor"]
63
 
64
  yolo: YoloConfig
65
  security_detector: SecurityDetector
66
  depth: DepthConfig
67
  intervals: IntervalsConfig
68
- redis_url:str
69
 
70
  @classmethod
71
- def settings_customise_sources(cls,
 
72
  settings_cls: type[BaseSettings], # Base param.
73
- **kwargs
74
- ) -> tuple[PydanticBaseSettingsSource, ...] :
75
  """
76
  Once you use this, no need to use load_config, it is already the same.
77
- But this time it fixs the priority part, order by parameters priority.
78
  """
79
 
80
  # Order by priority (first, more important)
81
  return (
82
- DotEnvSettingsSource(settings_cls), # Most important
83
- EnvSettingsSource(settings_cls), # This allow for ex. hugging face to override .env values with its values.
84
- YamlConfigSettingsSource(settings_cls),
85
- ) # The return must be a tuple
 
 
86
 
87
 
88
- if __name__ == "__main__":
89
-
90
- # Trying to checking both yaml and .env. This works really fine now.
91
  config = AppConfig()
92
  print(config.model_dump())
93
- print(config.model_dump()["project_name"])
 
1
  from pathlib import Path
2
  from typing import Literal, List
3
  from pydantic import BaseModel
4
+ from pydantic_settings import (
5
+ BaseSettings,
6
+ DotEnvSettingsSource,
7
+ EnvSettingsSource,
8
+ SettingsConfigDict,
9
+ PydanticBaseSettingsSource,
10
+ YamlConfigSettingsSource,
11
+ )
12
  import yaml
13
 
14
+
15
  def join_tag(loader, node):
16
  """
17
  Help joining pathes in config.YAML directly.
 
20
  path = Path(*(str(part) for part in parts)).resolve()
21
  return str(path)
22
 
23
+
24
+ # It didn't work before, After some research, .SafeLoaded is unmentioned must for my case.
25
  yaml.SafeLoader.add_constructor("!join", join_tag)
26
 
27
+
28
  class IntervalsConfig(BaseModel):
29
  system_metrics_seconds: float
30
  frames_summary_every: int
31
  realtime_updates_every: float
32
 
33
+
34
  class YoloConfig(BaseModel):
35
  """Contains yolo configurations"""
36
+
37
  model_name: str
38
  classes: List[str]
39
  batch_size: int
 
42
  augment: bool
43
  data_path: str
44
 
45
+
46
  class SecurityDetector(BaseModel):
47
  "Contains Security Detectors like Smoke - Fire"
48
+
49
  model_name: str
50
  classes: List[str]
51
 
52
+
53
  class DepthConfig(BaseModel):
54
  "Contains depths estimation configurations"
55
+
56
  model_name: str
57
  device: Literal["cuda", "cpu"]
58
  encoder: Literal["vits", "vitb", "vitl", "vitg"]
 
65
  - Override values with .env
66
  """
67
 
68
+ # Note that it doesn't show error, Take care.
69
  model_config = SettingsConfigDict(
70
  env_file=Path(__file__).parent / ".env",
71
  env_file_encoding="utf-8",
72
  yaml_file=Path(__file__).parent / "config.yaml",
73
+ extra="ignore", # Ignore other settings in yaml and env as they are not mentioedhere
74
  )
75
 
76
+ project_name: str
77
+ project_desc: str
78
  task: Literal["indoor", "outdoor"]
79
 
80
  yolo: YoloConfig
81
  security_detector: SecurityDetector
82
  depth: DepthConfig
83
  intervals: IntervalsConfig
84
+ redis_url: str
85
 
86
  @classmethod
87
+ def settings_customise_sources(
88
+ cls,
89
  settings_cls: type[BaseSettings], # Base param.
90
+ **kwargs,
91
+ ) -> tuple[PydanticBaseSettingsSource, ...]:
92
  """
93
  Once you use this, no need to use load_config, it is already the same.
94
+ But this time it fixs the priority part, order by parameters priority.
95
  """
96
 
97
  # Order by priority (first, more important)
98
  return (
99
+ DotEnvSettingsSource(settings_cls), # Most important
100
+ EnvSettingsSource(
101
+ settings_cls
102
+ ), # This allow for ex. hugging face to override .env values with its values.
103
+ YamlConfigSettingsSource(settings_cls),
104
+ ) # The return must be a tuple
105
 
106
 
107
+ if __name__ == "__main__":
108
+ # Trying to checking both yaml and .env. This works really fine now.
 
109
  config = AppConfig()
110
  print(config.model_dump())
111
+ print(config.model_dump()["project_name"])
contracts/camera_metadata.py CHANGED
@@ -1,11 +1,13 @@
1
  from typing import List
2
  from pydantic import BaseModel
3
 
 
4
  class DetectionMetadata(BaseModel):
5
  depth: float
6
  xRatio: float
7
 
 
8
  class CameraMetadata(BaseModel):
9
  camera_id: str
10
  is_danger: bool = False
11
- detection_metadata: List[DetectionMetadata]
 
1
  from typing import List
2
  from pydantic import BaseModel
3
 
4
+
5
  class DetectionMetadata(BaseModel):
6
  depth: float
7
  xRatio: float
8
 
9
+
10
  class CameraMetadata(BaseModel):
11
  camera_id: str
12
  is_danger: bool = False
13
+ detection_metadata: List[DetectionMetadata]
domain/detection_box_center.py ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ def calculate_detection_box_center(detections, image_width:float):
2
+
3
+ boxes_center = []
4
+ boxes_center_ratio = []
5
+ for box in detections.detections:
6
+ xmin, ymin, xmax, ymax = box.xyxy
7
+ xcenter = (xmax + xmin) / 2
8
+ ycenter = (ymax + ymin) / 2
9
+ boxes_center.append((int(xcenter), int(ycenter)))
10
+ boxes_center_ratio.append(xcenter / image_width)
11
+
12
+ return (boxes_center, boxes_center_ratio)
domain/logger.py CHANGED
@@ -2,20 +2,20 @@ from abc import ABC, abstractmethod
2
 
3
  class Logger(ABC):
4
  @abstractmethod
5
- def info(self, msg:str, **kwargs):
6
  pass
7
 
8
- def debug(self, msg:str, **kwargs):
9
  pass
10
 
11
  @abstractmethod
12
- def error(self, msg:str, **kwargs):
13
  pass
14
 
15
  @abstractmethod
16
- def warn(self, msg:str, **kwargs):
17
  pass
18
 
19
  @abstractmethod
20
- def exception(self, msg:str, **kwargs):
21
  pass
 
2
 
3
  class Logger(ABC):
4
  @abstractmethod
5
+ def info(self, msg: str, **kwargs):
6
  pass
7
 
8
+ def debug(self, msg: str, **kwargs):
9
  pass
10
 
11
  @abstractmethod
12
+ def error(self, msg: str, **kwargs):
13
  pass
14
 
15
  @abstractmethod
16
+ def warn(self, msg: str, **kwargs):
17
  pass
18
 
19
  @abstractmethod
20
+ def exception(self, msg: str, **kwargs):
21
  pass
infra/logger_structlog.py CHANGED
@@ -4,36 +4,42 @@ from domain.logger import Logger
4
  from pathlib import Path
5
  import logging
6
 
7
- # Don't forget to keep logs.json file meaningful.
 
8
  def setup_logging(logs_path: Path | str):
9
  # log_file = open(logs_path, "a", encoding="utf-8")
10
  structlog.configure(
11
- processors = [
12
  structlog.processors.StackInfoRenderer(), # Stack strace, showing the exact source of errors.
13
  structlog.processors.format_exc_info, # for Exceptions in JSON
14
  structlog.processors.add_log_level, # Adding log level (info, warning, error)
15
- structlog.processors.TimeStamper(fmt="iso", utc=True), # Adding ISO timestamp
 
 
16
  structlog.processors.JSONRenderer(), # Makes JSON outputs
17
  ],
18
- wrapper_class=structlog.make_filtering_bound_logger(logging.INFO), # Profiling info and higher.
 
 
19
  # logger_factory = structlog.WriteLoggerFactory(file=log_file), # Save in file instead of terminal
20
  cache_logger_on_first_use=True, # Caching being used for optimization
21
  )
22
 
 
23
  class StructLogger(Logger):
24
- def __init__(self, settings:AppConfig):
25
  setup_logging(logs_path="")
26
  self._logger = structlog.get_logger()
27
 
28
- def info(self, message:str, **kwargs):
29
  print(message)
30
  self._logger.info(message, **kwargs)
31
 
32
- def debug(self, message:str, **kwargs):
33
  print(message)
34
  self._logger.debug(message, **kwargs)
35
 
36
- def error(self, message:str, **kwargs):
37
  print(message)
38
  self._logger.error(message, **kwargs)
39
 
@@ -42,6 +48,6 @@ class StructLogger(Logger):
42
  print(message)
43
  self._logger.warn(message, **kwargs)
44
 
45
- def exception(self, message:str, **kwargs):
46
  print(message)
47
- self._logger.exception(message, **kwargs)
 
4
  from pathlib import Path
5
  import logging
6
 
7
+
8
+ # Don't forget to keep logs.json file meaningful.
9
  def setup_logging(logs_path: Path | str):
10
  # log_file = open(logs_path, "a", encoding="utf-8")
11
  structlog.configure(
12
+ processors=[
13
  structlog.processors.StackInfoRenderer(), # Stack strace, showing the exact source of errors.
14
  structlog.processors.format_exc_info, # for Exceptions in JSON
15
  structlog.processors.add_log_level, # Adding log level (info, warning, error)
16
+ structlog.processors.TimeStamper(
17
+ fmt="iso", utc=True
18
+ ), # Adding ISO timestamp
19
  structlog.processors.JSONRenderer(), # Makes JSON outputs
20
  ],
21
+ wrapper_class=structlog.make_filtering_bound_logger(
22
+ logging.INFO
23
+ ), # Profiling info and higher.
24
  # logger_factory = structlog.WriteLoggerFactory(file=log_file), # Save in file instead of terminal
25
  cache_logger_on_first_use=True, # Caching being used for optimization
26
  )
27
 
28
+
29
  class StructLogger(Logger):
30
+ def __init__(self, settings: AppConfig):
31
  setup_logging(logs_path="")
32
  self._logger = structlog.get_logger()
33
 
34
+ def info(self, message: str, **kwargs):
35
  print(message)
36
  self._logger.info(message, **kwargs)
37
 
38
+ def debug(self, message: str, **kwargs):
39
  print(message)
40
  self._logger.debug(message, **kwargs)
41
 
42
+ def error(self, message: str, **kwargs):
43
  print(message)
44
  self._logger.error(message, **kwargs)
45
 
 
48
  print(message)
49
  self._logger.warn(message, **kwargs)
50
 
51
+ def exception(self, message: str, **kwargs):
52
  print(message)
53
+ self._logger.exception(message, **kwargs)
infra/system_metrics.py CHANGED
@@ -1,19 +1,21 @@
1
  from domain.logger import Logger
2
  import psutil
3
  import asyncio
4
- from api.routers.metrics import active_workers, cpu_usage, mem_usage
5
 
6
- async def log_system_metrics(logger:Logger, logger_interval_sec:float):
 
7
  while True:
8
  cpu = psutil.cpu_percent(interval=1)
9
  mem = psutil.virtual_memory()
10
 
11
  # Structlog Logging
12
- logger.info("System Metrics",
 
13
  cpu_percent=cpu,
14
  memtory_percent=mem.percent,
15
  memory_used_gb=round(mem.used / (1024**3), 2),
16
- memory_total_gb=round(mem.total / (1024**3), 2)
17
  )
18
 
19
  # Prometheus
@@ -22,4 +24,4 @@ async def log_system_metrics(logger:Logger, logger_interval_sec:float):
22
 
23
  # active_workers.set()
24
 
25
- await asyncio.sleep(logger_interval_sec)
 
1
  from domain.logger import Logger
2
  import psutil
3
  import asyncio
4
+ from api.routers.metrics import cpu_usage, mem_usage
5
 
6
+
7
+ async def log_system_metrics(logger: Logger, logger_interval_sec: float):
8
  while True:
9
  cpu = psutil.cpu_percent(interval=1)
10
  mem = psutil.virtual_memory()
11
 
12
  # Structlog Logging
13
+ logger.info(
14
+ "System Metrics",
15
  cpu_percent=cpu,
16
  memtory_percent=mem.percent,
17
  memory_used_gb=round(mem.used / (1024**3), 2),
18
+ memory_total_gb=round(mem.total / (1024**3), 2),
19
  )
20
 
21
  # Prometheus
 
24
 
25
  # active_workers.set()
26
 
27
+ await asyncio.sleep(logger_interval_sec)
main.py CHANGED
@@ -3,7 +3,6 @@ from ai.depth.depth_anything import DepthAnything
3
  from ai.detectors.yolo_detector import YOLO_Detector
4
  from config.settings import AppConfig
5
  from api.routers.metrics import metrics_asgi_app
6
- from infra.system_metrics import log_system_metrics
7
  from api.routers import camera_stream
8
  from api.routers import dashboard_stream
9
  from api.routers import health
@@ -23,29 +22,39 @@ async def lifespan(app: FastAPI):
23
  """
24
  This is on_event("startup") new alternative, Make sure you load models here.
25
  """
26
-
27
  settings = AppConfig()
28
  logger = StructLogger(settings=settings)
29
  # Using this way to can store data. it is acts as a dict which holds instances
30
  app.state.logger = logger
31
  app.state.settings = settings
32
  app.state.mlflow_run_id = parent_run.info.run_id
33
-
34
  logger.info("Starting Server.... ")
35
- # asyncio.create_task(log_system_metrics(logger, logger_interval_sec=settings.intervals.system_metrics_seconds))
36
-
37
- detection_model_path = hf_fetch_model(repo_id="Ultralytics/YOLO26", filename=settings.yolo.model_name)
 
 
38
  app.state.detection_model = YOLO_Detector(detection_model_path)
39
 
40
- depth_model_path = hf_fetch_model(repo_id="depth-anything/Depth-Anything-V2-Small", filename=settings.depth.model_name)
41
- app.state.depth_model = DepthAnything(encoder=settings.depth.encoder, depth_model_path=depth_model_path, DEVICE=settings.depth.device)
 
 
 
 
 
 
 
42
 
43
- safety_detection_path = hf_fetch_model(repo_id="e1250/safety_detection", filename=settings.security_detector.model_name)
 
 
44
  app.state.safety_detection_model = YOLO_Detector(safety_detection_path)
45
 
46
-
47
  app.state.redis = aioredis.from_url(settings.redis_url, decode_responses=True)
48
- # Checking connection to redis - TODO add to health check
49
  try:
50
  await app.state.redis.ping()
51
  logger.info("Redis connected successfully...")
@@ -60,8 +69,9 @@ async def lifespan(app: FastAPI):
60
  torch.cuda.empty_cache()
61
  await app.state.redis.close()
62
 
 
63
  # MLFlow setup
64
- dagshub.init(repo_owner='eslam760000', repo_name='p-tracking_system', mlflow=True)
65
  mlflow.set_tracking_uri("sqlite:///config/logs/mlflow.db")
66
  mlflow.set_experiment("realtime-detection-system")
67
  parent_run = mlflow.start_run(run_name="server_session")
@@ -71,20 +81,20 @@ app = FastAPI(
71
  title="Tracking System Backend",
72
  description="real-time frame processing API",
73
  version="0.1.0",
74
- lifespan=lifespan
75
- )
76
-
77
- app.add_middleware(
78
- TrustedHostMiddleware,
79
- allowed_hosts=["*"]
80
  )
81
 
 
 
82
  # Routes
83
- app.mount("/metrics", metrics_asgi_app) # Starting Prometheus server attached to my server.
 
 
84
  app.include_router(camera_stream.router, prefix="/detectors")
85
  app.include_router(dashboard_stream.router, prefix="/dashboard")
86
  app.include_router(health.router, prefix="/health")
87
 
 
88
  @app.get("/")
89
  async def root():
90
- return {"status": "Real-Time tracker backend is running..."}
 
3
  from ai.detectors.yolo_detector import YOLO_Detector
4
  from config.settings import AppConfig
5
  from api.routers.metrics import metrics_asgi_app
 
6
  from api.routers import camera_stream
7
  from api.routers import dashboard_stream
8
  from api.routers import health
 
22
  """
23
  This is on_event("startup") new alternative, Make sure you load models here.
24
  """
25
+
26
  settings = AppConfig()
27
  logger = StructLogger(settings=settings)
28
  # Using this way to can store data. it is acts as a dict which holds instances
29
  app.state.logger = logger
30
  app.state.settings = settings
31
  app.state.mlflow_run_id = parent_run.info.run_id
32
+
33
  logger.info("Starting Server.... ")
34
+ # asyncio.create_task(log_system_metrics(logger, logger_interval_sec=settings.intervals.system_metrics_seconds))
35
+
36
+ detection_model_path = hf_fetch_model(
37
+ repo_id="Ultralytics/YOLO26", filename=settings.yolo.model_name
38
+ )
39
  app.state.detection_model = YOLO_Detector(detection_model_path)
40
 
41
+ depth_model_path = hf_fetch_model(
42
+ repo_id="depth-anything/Depth-Anything-V2-Small",
43
+ filename=settings.depth.model_name,
44
+ )
45
+ app.state.depth_model = DepthAnything(
46
+ encoder=settings.depth.encoder,
47
+ depth_model_path=depth_model_path,
48
+ DEVICE=settings.depth.device,
49
+ )
50
 
51
+ safety_detection_path = hf_fetch_model(
52
+ repo_id="e1250/safety_detection", filename=settings.security_detector.model_name
53
+ )
54
  app.state.safety_detection_model = YOLO_Detector(safety_detection_path)
55
 
 
56
  app.state.redis = aioredis.from_url(settings.redis_url, decode_responses=True)
57
+ # Checking connection to redis - TODO add to health check
58
  try:
59
  await app.state.redis.ping()
60
  logger.info("Redis connected successfully...")
 
69
  torch.cuda.empty_cache()
70
  await app.state.redis.close()
71
 
72
+
73
  # MLFlow setup
74
+ dagshub.init(repo_owner="eslam760000", repo_name="p-tracking_system", mlflow=True)
75
  mlflow.set_tracking_uri("sqlite:///config/logs/mlflow.db")
76
  mlflow.set_experiment("realtime-detection-system")
77
  parent_run = mlflow.start_run(run_name="server_session")
 
81
  title="Tracking System Backend",
82
  description="real-time frame processing API",
83
  version="0.1.0",
84
+ lifespan=lifespan,
 
 
 
 
 
85
  )
86
 
87
+ app.add_middleware(TrustedHostMiddleware, allowed_hosts=["*"])
88
+
89
  # Routes
90
+ app.mount(
91
+ "/metrics", metrics_asgi_app
92
+ ) # Starting Prometheus server attached to my server.
93
  app.include_router(camera_stream.router, prefix="/detectors")
94
  app.include_router(dashboard_stream.router, prefix="/dashboard")
95
  app.include_router(health.router, prefix="/health")
96
 
97
+
98
  @app.get("/")
99
  async def root():
100
+ return {"status": "Real-Time tracker backend is running..."}
utils/experiment.py CHANGED
@@ -2,11 +2,14 @@ from backend.config.settings import AppConfig
2
  import mlflow
3
 
4
  config = AppConfig()
 
 
5
  def log_config():
6
  mlflow.log_param("Detector", config.yolo.model_name)
7
  mlflow.log_param("Safety Model", config.security_detector.model_name)
8
  mlflow.log_param("Depth Model", config.depth.model_name)
9
 
10
- def log_metrics(metrics:dict):
 
11
  for k, v in metrics.items():
12
- mlflow.log_metric(k, v)
 
2
  import mlflow
3
 
4
  config = AppConfig()
5
+
6
+
7
  def log_config():
8
  mlflow.log_param("Detector", config.yolo.model_name)
9
  mlflow.log_param("Safety Model", config.security_detector.model_name)
10
  mlflow.log_param("Depth Model", config.depth.model_name)
11
 
12
+
13
+ def log_metrics(metrics: dict):
14
  for k, v in metrics.items():
15
+ mlflow.log_metric(k, v)