e1250 commited on
Commit
87aa1c0
·
1 Parent(s): 9c385e8

feat: adding testing into docker and pre-commit

Browse files
api/dependencies.py CHANGED
@@ -15,4 +15,4 @@ def get_safety_detection_model(request: HTTPConnection):
15
 
16
 
17
  def get_redis(request: HTTPConnection):
18
- return request.app.state.redis
 
15
 
16
 
17
  def get_redis(request: HTTPConnection):
18
+ return request.app.state.redis
api/routers/camera_stream.py CHANGED
@@ -1,6 +1,4 @@
1
- from backend.utils.profiling import profile_step
2
  from backend.services.pipeline import ProcessingPipeline
3
- from domain.detection_box_center import calculate_detection_box_center
4
  from api.dependencies import get_safety_detection_model
5
  from api.dependencies import get_detection_model, get_depth_model
6
  import asyncio
@@ -8,18 +6,10 @@ import itertools
8
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
9
  from api.routers.metrics import (
10
  active_cameras,
11
- decode_duration_seconds,
12
- depth_duration_seconds,
13
- detection_duration_seconds,
14
- frame_processing_duration_seconds,
15
  )
16
- from contracts.camera_metadata import CameraMetadata, DetectionMetadata
17
  import mlflow
18
  from utils.experiment import log_config
19
 
20
- import cv2 as cv
21
- import numpy as np
22
- import time
23
 
24
  router = APIRouter()
25
 
@@ -56,7 +46,6 @@ async def websocket_detect(
56
  step_counter = itertools.count()
57
  pipeline = ProcessingPipeline(detector, depth_model, safety_detector, redis)
58
 
59
-
60
  # Queue removing old images in case they were being stacked
61
  frame_queue: asyncio.Queue = asyncio.Queue(maxsize=1)
62
 
@@ -86,9 +75,13 @@ async def websocket_detect(
86
  frame_bytes = await frame_queue.get()
87
 
88
  try:
89
- results = await pipeline.run(camera_id, frame_bytes, next(step_counter))
 
 
90
  except Exception as e:
91
- logger.warn(f"Error happened while processing a frame in {camera_id}: {e}")
 
 
92
  logger.exception(e)
93
  continue
94
 
@@ -99,7 +92,9 @@ async def websocket_detect(
99
  logger.error(f"Processing Error: {e}", camera_id=camera_id)
100
  raise
101
 
102
- with mlflow.start_run(run_name=f"camera_{camera_id}", nested=True, parent_run_id=state.mlflow_run_id):
 
 
103
  log_config()
104
 
105
  try:
@@ -119,4 +114,4 @@ async def websocket_detect(
119
  await redis.srem(
120
  "cameras:active", camera_id
121
  ) # Remove the camera from redis connected cameras
122
- active_cameras.dec()
 
 
1
  from backend.services.pipeline import ProcessingPipeline
 
2
  from api.dependencies import get_safety_detection_model
3
  from api.dependencies import get_detection_model, get_depth_model
4
  import asyncio
 
6
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
7
  from api.routers.metrics import (
8
  active_cameras,
 
 
 
 
9
  )
 
10
  import mlflow
11
  from utils.experiment import log_config
12
 
 
 
 
13
 
14
  router = APIRouter()
15
 
 
46
  step_counter = itertools.count()
47
  pipeline = ProcessingPipeline(detector, depth_model, safety_detector, redis)
48
 
 
49
  # Queue removing old images in case they were being stacked
50
  frame_queue: asyncio.Queue = asyncio.Queue(maxsize=1)
51
 
 
75
  frame_bytes = await frame_queue.get()
76
 
77
  try:
78
+ results = await pipeline.run(
79
+ camera_id, frame_bytes, next(step_counter)
80
+ )
81
  except Exception as e:
82
+ logger.warn(
83
+ f"Error happened while processing a frame in {camera_id}: {e}"
84
+ )
85
  logger.exception(e)
86
  continue
87
 
 
92
  logger.error(f"Processing Error: {e}", camera_id=camera_id)
93
  raise
94
 
95
+ with mlflow.start_run(
96
+ run_name=f"camera_{camera_id}", nested=True, parent_run_id=state.mlflow_run_id
97
+ ):
98
  log_config()
99
 
100
  try:
 
114
  await redis.srem(
115
  "cameras:active", camera_id
116
  ) # Remove the camera from redis connected cameras
117
+ active_cameras.dec()
domain/detection_box_center.py CHANGED
@@ -1,5 +1,4 @@
1
- def calculate_detection_box_center(detections, image_width:float):
2
-
3
  boxes_center = []
4
  boxes_center_ratio = []
5
  for box in detections:
@@ -9,4 +8,4 @@ def calculate_detection_box_center(detections, image_width:float):
9
  boxes_center.append((int(xcenter), int(ycenter)))
10
  boxes_center_ratio.append(xcenter / image_width)
11
 
12
- return (boxes_center, boxes_center_ratio)
 
1
+ def calculate_detection_box_center(detections, image_width: float):
 
2
  boxes_center = []
3
  boxes_center_ratio = []
4
  for box in detections:
 
8
  boxes_center.append((int(xcenter), int(ycenter)))
9
  boxes_center_ratio.append(xcenter / image_width)
10
 
11
+ return (boxes_center, boxes_center_ratio)
domain/logger.py CHANGED
@@ -1,5 +1,6 @@
1
  from abc import ABC, abstractmethod
2
 
 
3
  class Logger(ABC):
4
  @abstractmethod
5
  def info(self, msg: str, **kwargs):
@@ -18,4 +19,4 @@ class Logger(ABC):
18
 
19
  @abstractmethod
20
  def exception(self, msg: str, **kwargs):
21
- pass
 
1
  from abc import ABC, abstractmethod
2
 
3
+
4
  class Logger(ABC):
5
  @abstractmethod
6
  def info(self, msg: str, **kwargs):
 
19
 
20
  @abstractmethod
21
  def exception(self, msg: str, **kwargs):
22
+ pass
services/pipeline.py CHANGED
@@ -20,9 +20,12 @@ class ProcessingPipeline:
20
  def _decode_frame(self, fb):
21
  return cv.imdecode(np.frombuffer(fb, np.uint8), cv.IMREAD_COLOR)
22
 
23
- def _camera_metadata(self, camera_id, safety_detection, depth_points, boxes_center_ratio) -> CameraMetadata:
 
 
24
  detection_metadata = [
25
- DetectionMetadata(depth=depth, xRatio=xRatio) for depth, xRatio in zip(depth_points, boxes_center_ratio)
 
26
  ]
27
  metadata = CameraMetadata(
28
  camera_id=camera_id,
@@ -31,25 +34,48 @@ class ProcessingPipeline:
31
  )
32
  return metadata
33
 
34
- async def run(self, camera_id:str, frame_bytes, frame_count):
35
  loop = asyncio.get_running_loop()
36
 
37
- with profile_step("frame_processing_time", decode_duration_seconds, camera_id, frame_count):
38
- frame_bytes = await loop.run_in_executor(None, self._decode_frame, frame_bytes)
 
 
 
 
39
 
40
- with profile_step("detection_duration_seconds", detection_duration_seconds, camera_id, frame_count):
41
- detection_task = loop.run_in_executor(None, self.detector.detect, frame_bytes)
42
- safety_task = loop.run_in_executor(None, self.safety_detector.detect, frame_bytes)
43
- detections, safety_detection = await asyncio.gather(detection_task, safety_task)
 
 
 
 
 
 
 
 
 
 
 
44
 
45
- boxes_center, boxes_center_ratio = calculate_detection_box_center(detections.detections, frame_bytes.shape[1])
 
 
46
 
47
  depth_points = []
48
- if boxes_center:
49
- with profile_step("depth_duration_seconds", depth_duration_seconds, camera_id, frame_count):
50
- depth_points = await loop.run_in_executor(None, self.depth_model.calculate_depth, frame_bytes, boxes_center)
 
 
 
 
51
 
52
- metadata = self._camera_metadata(camera_id, safety_detection, depth_points, boxes_center_ratio)
 
 
53
 
54
  await self.redis.publish("dashboard_stream", metadata.model_dump_json())
55
  # Even if the camera was disconnected, redis is still going to show its data, which is not accurate.
@@ -61,4 +87,4 @@ class ProcessingPipeline:
61
  )
62
 
63
  # Note that JSONResponse doesn't work here, as it is for HTTP
64
- return {"status": 200, "camera_id": camera_id}
 
20
  def _decode_frame(self, fb):
21
  return cv.imdecode(np.frombuffer(fb, np.uint8), cv.IMREAD_COLOR)
22
 
23
+ def _camera_metadata(
24
+ self, camera_id, safety_detection, depth_points, boxes_center_ratio
25
+ ) -> CameraMetadata:
26
  detection_metadata = [
27
+ DetectionMetadata(depth=depth, xRatio=xRatio)
28
+ for depth, xRatio in zip(depth_points, boxes_center_ratio)
29
  ]
30
  metadata = CameraMetadata(
31
  camera_id=camera_id,
 
34
  )
35
  return metadata
36
 
37
+ async def run(self, camera_id: str, frame_bytes, frame_count):
38
  loop = asyncio.get_running_loop()
39
 
40
+ with profile_step(
41
+ "frame_processing_time", decode_duration_seconds, camera_id, frame_count
42
+ ):
43
+ frame_bytes = await loop.run_in_executor(
44
+ None, self._decode_frame, frame_bytes
45
+ )
46
 
47
+ with profile_step(
48
+ "detection_duration_seconds",
49
+ detection_duration_seconds,
50
+ camera_id,
51
+ frame_count,
52
+ ):
53
+ detection_task = loop.run_in_executor(
54
+ None, self.detector.detect, frame_bytes
55
+ )
56
+ safety_task = loop.run_in_executor(
57
+ None, self.safety_detector.detect, frame_bytes
58
+ )
59
+ detections, safety_detection = await asyncio.gather(
60
+ detection_task, safety_task
61
+ )
62
 
63
+ boxes_center, boxes_center_ratio = calculate_detection_box_center(
64
+ detections.detections, frame_bytes.shape[1]
65
+ )
66
 
67
  depth_points = []
68
+ if boxes_center:
69
+ with profile_step(
70
+ "depth_duration_seconds", depth_duration_seconds, camera_id, frame_count
71
+ ):
72
+ depth_points = await loop.run_in_executor(
73
+ None, self.depth_model.calculate_depth, frame_bytes, boxes_center
74
+ )
75
 
76
+ metadata = self._camera_metadata(
77
+ camera_id, safety_detection, depth_points, boxes_center_ratio
78
+ )
79
 
80
  await self.redis.publish("dashboard_stream", metadata.model_dump_json())
81
  # Even if the camera was disconnected, redis is still going to show its data, which is not accurate.
 
87
  )
88
 
89
  # Note that JSONResponse doesn't work here, as it is for HTTP
90
+ return {"status": 200, "camera_id": camera_id}
utils/profiling.py CHANGED
@@ -2,6 +2,7 @@ from contextlib import contextmanager
2
  import time
3
  import mlflow
4
 
 
5
  @contextmanager
6
  def profile_step(expr_name: str, prometheus_logger, camera_id, frame_count=None):
7
  """With statement utility to time block of code"""
@@ -14,7 +15,7 @@ def profile_step(expr_name: str, prometheus_logger, camera_id, frame_count=None)
14
  duration = round(time.time() - start_time, 4)
15
  prometheus_logger.labels(camera_id).observe(duration)
16
  mlflow.log_metric(
17
- expr_name,
18
- duration,
19
- frame_count,
20
- )
 
2
  import time
3
  import mlflow
4
 
5
+
6
  @contextmanager
7
  def profile_step(expr_name: str, prometheus_logger, camera_id, frame_count=None):
8
  """With statement utility to time block of code"""
 
15
  duration = round(time.time() - start_time, 4)
16
  prometheus_logger.labels(camera_id).observe(duration)
17
  mlflow.log_metric(
18
+ expr_name,
19
+ duration,
20
+ frame_count,
21
+ )