e1250 commited on
Commit
3d237cb
·
1 Parent(s): 5e55742

fix: modification, code cleaning

Browse files
api/routers/camera_stream.py CHANGED
@@ -41,13 +41,10 @@ async def websocket_detect(
41
 
42
  # Logging and tracking action
43
  active_cameras.inc()
 
44
  logger.info(f"Client ID >>{camera_id}<< Connected...")
45
-
46
- if mlflow.active_run():
47
- mlflow.end_run()
48
- run = mlflow.start_run(run_name=f'camera_{camera_id}', nested=True)
49
  step_counter = itertools.count()
50
- log_config()
51
 
52
  loop = asyncio.get_running_loop()
53
  # Queue removing old images in case they were being stacked
@@ -71,7 +68,6 @@ async def websocket_detect(
71
  raise
72
 
73
  async def process_frames():
74
-
75
  try:
76
 
77
  logger.info(f"Camera {camera_id} start sending frames...")
@@ -84,12 +80,12 @@ async def websocket_detect(
84
 
85
  # Profiling
86
  t0 = time.time()
87
-
88
  image_array = await loop.run_in_executor(None, decode_frame, frame_bytes)
89
  decode_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
90
  mlflow.log_metric("frame_processing_time", round(time.time() - t0, 3), next(step_counter))
91
 
92
  # Apply detection models
 
93
  detection_task = loop.run_in_executor(None, detector.detect, image_array)
94
  safety_task = loop.run_in_executor(None, safety_detector.detect, image_array)
95
  detections, safety_detection = await asyncio.gather(detection_task, safety_task)
@@ -111,6 +107,7 @@ async def websocket_detect(
111
  boxes_center.append((int(xcenter), int(ycenter)))
112
  boxes_center_ratio.append(xcenter / image_array.shape[1])
113
 
 
114
  depth_points = await loop.run_in_executor(None, depth_model.calculate_depth, image_array, boxes_center) if boxes_center else []
115
  depth_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
116
  mlflow.log_metric("depth_duration_seconds", round(time.time() - t0, 3), next(step_counter))
@@ -133,22 +130,26 @@ async def websocket_detect(
133
  except Exception as e:
134
  logger.error(f"Processing Error: {e}", camera_id=camera_id)
135
  raise
 
 
 
136
 
137
- try:
138
- await asyncio.gather(
139
- receive_frames(),
140
- process_frames()
141
- )
142
-
143
- except WebSocketDisconnect:
144
- logger.warn(f"Client ID >>{camera_id}<< Disconnected Normally...")
145
 
146
- except Exception as e:
147
- logger.error(f"Error in websocker, Client ID: >>{camera_id}<<: {e}")
148
- traceback.print_exc() # This one is actually really better, it shows more details about the issue happened.
149
- # Also work on and create the logger.exception, as it directly controls printing more details about the issue happened.
150
- await websocket.close()
151
 
152
- finally:
153
- active_cameras.dec()
154
- mlflow.end_run()
 
 
 
 
 
 
 
 
41
 
42
  # Logging and tracking action
43
  active_cameras.inc()
44
+ await redis.sadd("cameras:active", camera_id) # Save connected camera name into redis
45
  logger.info(f"Client ID >>{camera_id}<< Connected...")
46
+
 
 
 
47
  step_counter = itertools.count()
 
48
 
49
  loop = asyncio.get_running_loop()
50
  # Queue removing old images in case they were being stacked
 
68
  raise
69
 
70
  async def process_frames():
 
71
  try:
72
 
73
  logger.info(f"Camera {camera_id} start sending frames...")
 
80
 
81
  # Profiling
82
  t0 = time.time()
 
83
  image_array = await loop.run_in_executor(None, decode_frame, frame_bytes)
84
  decode_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
85
  mlflow.log_metric("frame_processing_time", round(time.time() - t0, 3), next(step_counter))
86
 
87
  # Apply detection models
88
+ t0 = time.time()
89
  detection_task = loop.run_in_executor(None, detector.detect, image_array)
90
  safety_task = loop.run_in_executor(None, safety_detector.detect, image_array)
91
  detections, safety_detection = await asyncio.gather(detection_task, safety_task)
 
107
  boxes_center.append((int(xcenter), int(ycenter)))
108
  boxes_center_ratio.append(xcenter / image_array.shape[1])
109
 
110
+ t0 = time.time()
111
  depth_points = await loop.run_in_executor(None, depth_model.calculate_depth, image_array, boxes_center) if boxes_center else []
112
  depth_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
113
  mlflow.log_metric("depth_duration_seconds", round(time.time() - t0, 3), next(step_counter))
 
130
  except Exception as e:
131
  logger.error(f"Processing Error: {e}", camera_id=camera_id)
132
  raise
133
+
134
+ with mlflow.start_run(run_name=f'camera_{camera_id}', nested=True, parent_run_id=state.mlflow_run_id):
135
+ log_config()
136
 
137
+ try:
138
+ await asyncio.gather(
139
+ receive_frames(),
140
+ process_frames()
141
+ )
 
 
 
142
 
143
+ except WebSocketDisconnect:
144
+ logger.warn(f"Client ID >>{camera_id}<< Disconnected Normally...")
 
 
 
145
 
146
+ except Exception as e:
147
+ logger.error(f"Error in websocker, Client ID: >>{camera_id}<<: {e}")
148
+ logger.exception(e)
149
+ # This one is actually really better, it shows more details about the issue happened.
150
+ # Also work on and create the logger.exception, as it directly controls printing more details about the issue happened.
151
+ await websocket.close()
152
+
153
+ finally:
154
+ await redis.srem("cameras:active", camera_id) # Remove the camera from redis connected cameras
155
+ active_cameras.dec()
api/routers/dashboard_stream.py CHANGED
@@ -44,7 +44,7 @@ async def dashboard_websocket(websocket: WebSocket):
44
 
45
  except Exception as e:
46
  logger.error(f"Dashboard Error: {e}")
47
- traceback.print_exc()
48
 
49
  finally:
50
  active_dashboards.dec()
 
44
 
45
  except Exception as e:
46
  logger.error(f"Dashboard Error: {e}")
47
+ logger.exception(e)
48
 
49
  finally:
50
  active_dashboards.dec()
api/routers/health.py CHANGED
@@ -2,6 +2,7 @@
2
  # This file is being used mostly in HTTP and not websockets.
3
  # Health check is being used for example by docker, to check is dependencies are working fine, if not, he might restart.
4
 
 
5
  from http import HTTPStatus
6
  from datetime import datetime
7
  from fastapi import APIRouter, Response
@@ -28,15 +29,12 @@ async def live_check(response: Response):
28
  }
29
 
30
  @router.get("/ready")
31
- async def ready_check(response: Response):
32
  """
33
  Checck if parts work here, ex. are data readable.
34
  Are data readable here.
35
  Also can this instance accept traffic right now, or send them to another healthy instance.
36
  """
37
- # 1. Check database ping
38
- # 2. Check Redis or cache ping
39
- # 3. Queue connection or length
40
 
41
  checks = {}
42
  healthy = True
@@ -57,10 +55,12 @@ async def ready_check(response: Response):
57
  checks["detection_model"] = "can't load"
58
  healthy = False
59
 
 
 
60
  response.status_code = HTTPStatus.OK if healthy else HTTPStatus.SERVICE_UNAVAILABLE
61
 
62
  return {
63
- "status": "ready",
64
  "checks": checks,
65
  "timestamp": datetime.now().isoformat(), # Sending the time also is a good practise
66
  "version": "1.0.0",
 
2
  # This file is being used mostly in HTTP and not websockets.
3
  # Health check is being used for example by docker, to check is dependencies are working fine, if not, he might restart.
4
 
5
+ from requests import Request
6
  from http import HTTPStatus
7
  from datetime import datetime
8
  from fastapi import APIRouter, Response
 
29
  }
30
 
31
  @router.get("/ready")
32
+ async def ready_check(response: Response, request: Request):
33
  """
34
  Checck if parts work here, ex. are data readable.
35
  Are data readable here.
36
  Also can this instance accept traffic right now, or send them to another healthy instance.
37
  """
 
 
 
38
 
39
  checks = {}
40
  healthy = True
 
55
  checks["detection_model"] = "can't load"
56
  healthy = False
57
 
58
+ checks["active_cameras"] = list(await request.app.state.redis.smembers("cameras:active"))
59
+
60
  response.status_code = HTTPStatus.OK if healthy else HTTPStatus.SERVICE_UNAVAILABLE
61
 
62
  return {
63
+ "status": "ready" if healthy else "degraded",
64
  "checks": checks,
65
  "timestamp": datetime.now().isoformat(), # Sending the time also is a good practise
66
  "version": "1.0.0",
contracts/camera_metadata.py CHANGED
@@ -8,4 +8,4 @@ class DetectionMetadata(BaseModel):
8
  class CameraMetadata(BaseModel):
9
  camera_id: str
10
  is_danger: bool = False
11
- detection_metadata: List
 
8
  class CameraMetadata(BaseModel):
9
  camera_id: str
10
  is_danger: bool = False
11
+ detection_metadata: List[DetectionMetadata]
main.py CHANGED
@@ -29,6 +29,7 @@ async def lifespan(app: FastAPI):
29
  # Using this way to can store data. it is acts as a dict which holds instances
30
  app.state.logger = logger
31
  app.state.settings = settings
 
32
 
33
  logger.info("Starting Server.... ")
34
  # asyncio.create_task(log_system_metrics(logger, logger_interval_sec=settings.intervals.system_metrics_seconds))
@@ -63,6 +64,7 @@ async def lifespan(app: FastAPI):
63
  dagshub.init(repo_owner='eslam760000', repo_name='p-tracking_system', mlflow=True)
64
  mlflow.set_tracking_uri("sqlite:///config/logs/mlflow.db")
65
  mlflow.set_experiment("realtime-detection-system")
 
66
  mlflow.enable_system_metrics_logging()
67
 
68
  app = FastAPI(
 
29
  # Using this way to can store data. it is acts as a dict which holds instances
30
  app.state.logger = logger
31
  app.state.settings = settings
32
+ app.state.mlflow_run_id = parent_run.info.run_id
33
 
34
  logger.info("Starting Server.... ")
35
  # asyncio.create_task(log_system_metrics(logger, logger_interval_sec=settings.intervals.system_metrics_seconds))
 
64
  dagshub.init(repo_owner='eslam760000', repo_name='p-tracking_system', mlflow=True)
65
  mlflow.set_tracking_uri("sqlite:///config/logs/mlflow.db")
66
  mlflow.set_experiment("realtime-detection-system")
67
+ parent_run = mlflow.start_run(run_name="server_session")
68
  mlflow.enable_system_metrics_logging()
69
 
70
  app = FastAPI(