| import cv2 |
| from numpy import random |
| from collections import deque |
| import numpy as np |
| import math |
| import torch |
| import torch.backends.cudnn as cudnn |
|
|
| from utils.google_utils import attempt_load |
| from utils.datasets import LoadStreams, LoadImages |
| from utils.general import ( |
| check_img_size, non_max_suppression, apply_classifier, scale_coords, xyxy2xywh, strip_optimizer) |
| from utils.plots import plot_one_box |
| from utils.torch_utils import select_device, load_classifier, time_synchronized |
|
|
| from models.models import * |
| from utils.datasets import * |
| from utils.general import * |
|
|
| from deep_sort_pytorch.utils.parser import get_config |
| from deep_sort_pytorch.deep_sort import DeepSort |
|
|
| from byte_track.bytetracker import ByteTrack |
| from yolov7.yolov7_detector import YOLOv7Detector |
|
|
| def load_classes(path): |
| |
| with open(path, 'r') as f: |
| names = f.read().split('\n') |
| return list(filter(None, names)) |
|
|
| global names |
| names = load_classes('data/coco.names') |
|
|
|
|
| colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(len(names))] |
| palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1) |
| data_deque = {} |
| speed_four_line_queue = {} |
| object_counter = {} |
|
|
| |
|
|
| line2 = [(200,500), (1050, 500)] |
|
|
|
|
| def xyxy_to_xywh(*xyxy): |
| """" Calculates the relative bounding box from absolute pixel values. """ |
| bbox_left = min([xyxy[0].item(), xyxy[2].item()]) |
| bbox_top = min([xyxy[1].item(), xyxy[3].item()]) |
| bbox_w = abs(xyxy[0].item() - xyxy[2].item()) |
| bbox_h = abs(xyxy[1].item() - xyxy[3].item()) |
| x_c = (bbox_left + bbox_w / 2) |
| y_c = (bbox_top + bbox_h / 2) |
| w = bbox_w |
| h = bbox_h |
| return x_c, y_c, w, h |
|
|
| def xyxy_to_tlwh(bbox_xyxy): |
| tlwh_bboxs = [] |
| for i, box in enumerate(bbox_xyxy): |
| x1, y1, x2, y2 = [int(i) for i in box] |
| top = x1 |
| left = y1 |
| w = int(x2 - x1) |
| h = int(y2 - y1) |
| tlwh_obj = [top, left, w, h] |
| tlwh_bboxs.append(tlwh_obj) |
| return tlwh_bboxs |
|
|
| def compute_color_for_labels(label): |
| """ |
| Simple function that adds fixed color depending on the class |
| """ |
| if label == 0: |
| color = (85,45,255) |
| elif label == 2: |
| color = (222,82,175) |
| elif label == 3: |
| color = (0, 204, 255) |
| elif label == 5: |
| color = (0, 149, 255) |
| else: |
| color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette] |
| return tuple(color) |
|
|
| def draw_border(img, pt1, pt2, color, thickness, r, d): |
| x1,y1 = pt1 |
| x2,y2 = pt2 |
| |
| cv2.line(img, (x1 + r, y1), (x1 + r + d, y1), color, thickness) |
| cv2.line(img, (x1, y1 + r), (x1, y1 + r + d), color, thickness) |
| cv2.ellipse(img, (x1 + r, y1 + r), (r, r), 180, 0, 90, color, thickness) |
|
|
| |
| cv2.line(img, (x2 - r, y1), (x2 - r - d, y1), color, thickness) |
| cv2.line(img, (x2, y1 + r), (x2, y1 + r + d), color, thickness) |
| cv2.ellipse(img, (x2 - r, y1 + r), (r, r), 270, 0, 90, color, thickness) |
| |
| cv2.line(img, (x1 + r, y2), (x1 + r + d, y2), color, thickness) |
| cv2.line(img, (x1, y2 - r), (x1, y2 - r - d), color, thickness) |
| cv2.ellipse(img, (x1 + r, y2 - r), (r, r), 90, 0, 90, color, thickness) |
| |
| cv2.line(img, (x2 - r, y2), (x2 - r - d, y2), color, thickness) |
| cv2.line(img, (x2, y2 - r), (x2, y2 - r - d), color, thickness) |
| cv2.ellipse(img, (x2 - r, y2 - r), (r, r), 0, 0, 90, color, thickness) |
|
|
| cv2.rectangle(img, (x1 + r, y1), (x2 - r, y2), color, -1, cv2.LINE_AA) |
| cv2.rectangle(img, (x1, y1 + r), (x2, y2 - r - d), color, -1, cv2.LINE_AA) |
| |
| cv2.circle(img, (x1 +r, y1+r), 2, color, 12) |
| cv2.circle(img, (x2 -r, y1+r), 2, color, 12) |
| cv2.circle(img, (x1 +r, y2-r), 2, color, 12) |
| cv2.circle(img, (x2 -r, y2-r), 2, color, 12) |
| |
| return img |
|
|
| def UI_box(x, img, color=None, label=None, line_thickness=None): |
| |
| tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 |
| color = color or [random.randint(0, 255) for _ in range(3)] |
| c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3])) |
| |
| if label: |
| tf = max(tl - 1, 1) |
| t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0] |
| |
| |
| img = draw_border(img, (c1[0], c1[1] - t_size[1] -3), (c1[0] + t_size[0], c1[1]+3), color, 1, 8, 2) |
| |
| |
| |
| cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA) |
|
|
| def estimateSpeed(location1, location2): |
|
|
| d_pixels = math.sqrt(math.pow(location2[0] - location1[0], 2) + math.pow(location2[1] - location1[1], 2)) |
| ppm = 8 |
| d_meters = d_pixels / ppm |
| time_constant = 15 * 3.6 |
| speed = d_meters * time_constant |
| return speed |
|
|
| |
| def intersect(A,B,C,D): |
| return ccw(A,C,D) != ccw(B,C,D) and ccw(A,B,C) != ccw(A,B,D) |
|
|
| def ccw(A,B,C): |
| return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0]) |
|
|
|
|
|
|
|
|
|
|
|
|
| def draw_boxes(img, bbox, object_id, identities=None, offset=(0, 0)): |
| |
| |
| height, width, _ = img.shape |
| |
| for key in list(data_deque): |
| if key not in identities: |
| data_deque.pop(key) |
|
|
| for i, box in enumerate(bbox): |
| x1, y1 = int(box[0]), int(box[1]) |
| x2, y2 = x1 + int(box[2]), y1 + int(box[3]) |
| box=[x1, y1, x2, y2] |
|
|
| |
| x1 += offset[0] |
| x2 += offset[0] |
| y1 += offset[1] |
| y2 += offset[1] |
|
|
| |
| |
| box_height = (y2-y1) |
|
|
| |
| center = (int((x2+x1)/ 2), int((y2+y2)/2)) |
| |
| |
| id = int(identities[i]) if identities is not None else 0 |
|
|
| |
| if id not in data_deque: |
| data_deque[id] = deque(maxlen= 64) |
| speed_four_line_queue[id] = [] |
|
|
| color = compute_color_for_labels(int(object_id[i])) |
| obj_name = names[int(object_id[i])] |
| label = '%s' % (obj_name) |
|
|
| |
| data_deque[id].appendleft(center) |
|
|
| |
| |
|
|
| if len(data_deque[id]) >= 2: |
| |
|
|
| if intersect(data_deque[id][0], data_deque[id][1], line2[0], line2[1]): |
| |
| |
|
|
| obj_speed = estimateSpeed(data_deque[id][1], data_deque[id][0]) |
| |
| speed_four_line_queue[id].append(obj_speed) |
| |
| if obj_name not in object_counter: |
| object_counter[obj_name] = 1 |
| else: |
| object_counter[obj_name] += 1 |
| |
| try: |
| label = label + " " + str(sum(speed_four_line_queue[id])//len(speed_four_line_queue[id])) |
| except : |
| pass |
|
|
| UI_box(box, img, label=label, color=color, line_thickness=2) |
| |
| |
| for i in range(1, len(data_deque[id])): |
| |
| if data_deque[id][i - 1] is None or data_deque[id][i] is None: |
| continue |
|
|
| |
| thickness = int(np.sqrt(64 / float(i + i)) * 1.5) |
| |
| |
| cv2.line(img, data_deque[id][i - 1], data_deque[id][i], color, thickness) |
|
|
|
|
| count = 0 |
| for idx, (key, value) in enumerate(object_counter.items()): |
| |
| cnt_str = str(key) + ": " + str(value) |
|
|
| cv2.line(img, (width - 150 ,25+ (idx*40)), (width,25 + (idx*40)), [85,45,255], 30) |
| cv2.putText(img, cnt_str, (width - 150, 35 + (idx*40)), 0, 1, [225, 255, 255], thickness=2, lineType=cv2.LINE_AA) |
|
|
| count += value |
|
|
| return img, count |
|
|
| def load_yolov7_and_process_each_frame(model, vid_name, enable_GPU, save_video, confidence, assigned_class_id, kpi1_text, kpi2_text, kpi3_text, stframe): |
| data_deque.clear() |
| speed_four_line_queue.clear() |
| object_counter.clear() |
|
|
| if model == 'yolov7': |
| weights = 'yolov7/weights/yolov7.onnx' |
| elif model == 'yolov7-tiny': |
| weights = 'yolov7/weights/yolov7-tiny.onnx' |
| else: |
| print('Model Not Found!') |
| exit() |
|
|
| detector = YOLOv7Detector(weights=weights, use_cuda=enable_GPU, use_onnx=True) |
| tracker = ByteTrack(detector) |
| |
|
|
| vdo = cv2.VideoCapture(vid_name) |
| results = [] |
| start = time.time() |
| count = 0 |
| frame_id = 0 |
| prevTime = 0 |
|
|
| fourcc = 'mp4v' |
| fps = vdo.get(cv2.CAP_PROP_FPS) |
| w = int(vdo.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| h = int(vdo.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| vid_writer = cv2.VideoWriter('inference/output/results.mp4', cv2.VideoWriter_fourcc(*fourcc), fps, (w, h)) |
|
|
| while vdo.isOpened(): |
| |
| curr_time = time.time() |
| frame_id +=1 |
| _, img = vdo.read() |
| |
| if _ == False: |
| break |
|
|
| |
| |
| bboxes, ids, scores, obj_ids = tracker.inference(img, conf_thresh=confidence, classes=assigned_class_id) |
| |
| img, count = draw_boxes(img, bboxes, obj_ids, identities=ids) |
| currTime = time.time() |
| fps = 1 / (currTime - prevTime) |
| prevTime = currTime |
|
|
| |
| cv2.line(img, (20,25), (127,25), [85,45,255], 30) |
| cv2.putText(img, f'FPS: {int(fps)}', (11, 35), 0, 1, [225, 255, 255], thickness=2, lineType=cv2.LINE_AA) |
|
|
| if save_video: |
| vid_writer.write(img) |
|
|
| kpi1_text.write(f"<h1 style='text-align: center; color: red;'>{fps:.1f}</h1>", unsafe_allow_html=True) |
| kpi2_text.write(f"<h1 style='text-align: center; color: red;'>{len(data_deque)}</h1>", unsafe_allow_html=True) |
| kpi3_text.write(f"<h1 style='text-align: center; color: red;'>{count}</h1>", unsafe_allow_html=True) |
| |
| |
| stframe.image(img, channels = 'BGR',use_column_width=True) |
|
|
|
|
| end = time.time() |
| print('Done. (%.3fs)' % (end - start)) |
| cv2.destroyAllWindows() |
| vdo.release() |
| vid_writer.release() |
|
|
|
|
| def load_yolor_and_process_each_frame(vid_name, enable_GPU, confidence, assigned_class_id, kpi1_text, kpi2_text, kpi3_text, stframe): |
| data_deque.clear() |
| speed_four_line_queue.clear() |
| object_counter.clear() |
|
|
| out, source, weights, save_txt, imgsz, cfg = \ |
| 'inference/output', vid_name, 'yolor_p6.pt', False, 1280, 'cfg/yolor_p6.cfg' |
| |
| |
| webcam = source == 0 or source.startswith('rtsp') or source.startswith('http') or source.endswith('.txt') |
|
|
| |
| cfg_deep = get_config() |
| cfg_deep.merge_from_file("deep_sort_pytorch/configs/deep_sort.yaml") |
| |
| deepsort = DeepSort(cfg_deep.DEEPSORT.REID_CKPT, |
| max_dist=cfg_deep.DEEPSORT.MAX_DIST, min_confidence=cfg_deep.DEEPSORT.MIN_CONFIDENCE, |
| nms_max_overlap=cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg_deep.DEEPSORT.MAX_IOU_DISTANCE, |
| max_age=cfg_deep.DEEPSORT.MAX_AGE, n_init=cfg_deep.DEEPSORT.N_INIT, nn_budget=cfg_deep.DEEPSORT.NN_BUDGET, |
| use_cuda=True) |
|
|
| |
| if enable_GPU: |
| device = select_device('gpu') |
| else: |
| device = select_device('cpu') |
| |
| if os.path.exists(out): |
| shutil.rmtree(out) |
| os.makedirs(out) |
| half = device.type != 'cpu' |
|
|
| |
| model = Darknet(cfg, imgsz) |
| model.load_state_dict(torch.load(weights, map_location=device)['model']) |
| model.to(device).eval() |
| if half: |
| model.half() |
|
|
| |
| classify = False |
| if classify: |
| modelc = load_classifier(name='resnet101', n=2) |
| modelc.load_state_dict(torch.load('weights/resnet101.pt', map_location=device)['model']) |
| modelc.to(device).eval() |
|
|
| |
| vid_path, vid_writer = None, None |
| if webcam: |
| save_img = True |
| print("HEREHERER") |
| |
| |
| else: |
| save_img = True |
| dataset = LoadImages(source, img_size=imgsz, auto_size=64) |
|
|
|
|
| |
| t0 = time.time() |
| img = torch.zeros((1, 3, imgsz, imgsz), device=device) |
| _ = model(img.half() if half else img) if device.type != 'cpu' else None |
| prevTime = 0 |
| count = 0 |
|
|
| if webcam: |
|
|
| vid = cv2.VideoCapture(0) |
|
|
| while vid.isOpened(): |
| ret, img = vid.read() |
| if not ret: |
| continue |
|
|
| im0s = img.copy() |
| print(im0s.shape) |
| img = img[:, :, ::-1].transpose(2, 0, 1) |
| print(img.shape) |
|
|
| img = torch.from_numpy(img.copy()).to(device) |
| img = img.half() if half else img.float() |
| img /= 255.0 |
| if img.ndimension() == 3: |
| img = img.unsqueeze(0) |
|
|
| print(img.shape) |
|
|
| |
| t1 = time_synchronized() |
| pred = model(img)[0] |
|
|
| |
| pred = non_max_suppression(pred, confidence, 0.5, classes=assigned_class_id, agnostic=False) |
| t2 = time_synchronized() |
|
|
| |
| if classify: |
| pred = apply_classifier(pred, modelc, img, im0s) |
|
|
| print("HERE") |
| |
| for i, det in enumerate(pred): |
| p, s, im0 = "webcam_out.mp4", '', im0s |
|
|
| |
| |
| s += '%gx%g ' % img.shape[2:] |
| gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] |
| if det is not None and len(det): |
| |
| det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round() |
|
|
| |
| for c in det[:, -1].unique(): |
| n = (det[:, -1] == c).sum() |
| s += '%g %ss, ' % (n, names[int(c)]) |
|
|
| xywh_bboxs = [] |
| confs = [] |
| oids = [] |
| |
| for *xyxy, conf, cls in det: |
| |
| x_c, y_c, bbox_w, bbox_h = xyxy_to_xywh(*xyxy) |
| xywh_obj = [x_c, y_c, bbox_w, bbox_h] |
| xywh_bboxs.append(xywh_obj) |
| confs.append([conf.item()]) |
| oids.append(int(cls)) |
|
|
| |
| |
| |
| |
|
|
| xywhs = torch.Tensor(xywh_bboxs) |
| confss = torch.Tensor(confs) |
| |
| outputs = deepsort.update(xywhs, confss, oids, im0) |
| if len(outputs) > 0: |
| bbox_xyxy = outputs[:, :4] |
| identities = outputs[:, -2] |
| object_id = outputs[:, -1] |
| im0, count = draw_boxes(im0, bbox_xyxy, object_id,identities) |
|
|
| |
| print('%sDone. (%.3fs)' % (s, t2 - t1)) |
|
|
| currTime = time.time() |
| fps = 1 / (currTime - prevTime) |
| prevTime = currTime |
| cv2.line(im0, (20,25), (127,25), [85,45,255], 30) |
| cv2.putText(im0, f'FPS: {int(fps)}', (11, 35), 0, 1, [225, 255, 255], thickness=2, lineType=cv2.LINE_AA) |
| kpi1_text.write(f"<h1 style='text-align: center; color: red;'>{'{:.1f}'.format(fps)}</h1>", unsafe_allow_html=True) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| kpi2_text.write(f"<h1 style='text-align: center; color: red;'>{len(data_deque)}</h1>", unsafe_allow_html=True) |
| kpi3_text.write(f"<h1 style='text-align: center; color: red;'>{count}</h1>", unsafe_allow_html=True) |
| stframe.image(im0,channels = 'BGR',use_column_width=True) |
|
|
| else: |
| for path, img, im0s, vid_cap in dataset: |
| |
| |
| |
| |
|
|
|
|
| img = torch.from_numpy(img).to(device) |
| img = img.half() if half else img.float() |
| img /= 255.0 |
| if img.ndimension() == 3: |
| img = img.unsqueeze(0) |
|
|
| |
| t1 = time_synchronized() |
| print(img.shape) |
|
|
| pred = model(img)[0] |
|
|
| |
| pred = non_max_suppression(pred, confidence, 0.5, classes=assigned_class_id, agnostic=False) |
| t2 = time_synchronized() |
|
|
| |
| if classify: |
| pred = apply_classifier(pred, modelc, img, im0s) |
|
|
| |
| for i, det in enumerate(pred): |
| if webcam: |
| p, s, im0 = path[i], '%g: ' % i, im0s[i].copy() |
| else: |
| p, s, im0 = path, '', im0s |
|
|
| save_path = str(Path(out) / Path(p).name) |
| txt_path = str(Path(out) / Path(p).stem) + ('_%g' % dataset.frame if dataset.mode == 'video' else '') |
| s += '%gx%g ' % img.shape[2:] |
| gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] |
| if det is not None and len(det): |
| |
| det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round() |
|
|
| |
| for c in det[:, -1].unique(): |
| n = (det[:, -1] == c).sum() |
| s += '%g %ss, ' % (n, names[int(c)]) |
|
|
| xywh_bboxs = [] |
| confs = [] |
| oids = [] |
| |
| for *xyxy, conf, cls in det: |
| |
| x_c, y_c, bbox_w, bbox_h = xyxy_to_xywh(*xyxy) |
| xywh_obj = [x_c, y_c, bbox_w, bbox_h] |
| xywh_bboxs.append(xywh_obj) |
| confs.append([conf.item()]) |
| oids.append(int(cls)) |
|
|
| if save_txt: |
| xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist() |
| with open(txt_path + '.txt', 'a') as f: |
| f.write(('%g ' * 5 + '\n') % (cls, *xywh)) |
|
|
| xywhs = torch.Tensor(xywh_bboxs) |
| confss = torch.Tensor(confs) |
| |
| outputs = deepsort.update(xywhs, confss, oids, im0) |
| if len(outputs) > 0: |
| bbox_xyxy = outputs[:, :4] |
| identities = outputs[:, -2] |
| object_id = outputs[:, -1] |
| im0, count = draw_boxes(im0, bbox_xyxy, object_id,identities) |
|
|
| |
| print('%sDone. (%.3fs)' % (s, t2 - t1)) |
|
|
| currTime = time.time() |
| fps = 1 / (currTime - prevTime) |
| prevTime = currTime |
| cv2.line(im0, (20,25), (127,25), [85,45,255], 30) |
| cv2.putText(im0, f'FPS: {int(fps)}', (11, 35), 0, 1, [225, 255, 255], thickness=2, lineType=cv2.LINE_AA) |
| kpi1_text.write(f"<h1 style='text-align: center; color: red;'>{'{:.1f}'.format(fps)}</h1>", unsafe_allow_html=True) |
| |
| |
| if save_img: |
| if dataset.mode == 'images': |
| cv2.imwrite(save_path, im0) |
| else: |
| if vid_path != save_path: |
| vid_path = save_path |
| if isinstance(vid_writer, cv2.VideoWriter): |
| vid_writer.release() |
|
|
| fourcc = 'mp4v' |
| fps = vid_cap.get(cv2.CAP_PROP_FPS) |
| w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*fourcc), fps, (w, h)) |
| vid_writer.write(im0) |
|
|
| |
|
|
| kpi2_text.write(f"<h1 style='text-align: center; color: red;'>{len(data_deque)}</h1>", unsafe_allow_html=True) |
| kpi3_text.write(f"<h1 style='text-align: center; color: red;'>{count}</h1>", unsafe_allow_html=True) |
| stframe.image(im0,channels = 'BGR',use_column_width=True) |
|
|
| |
| if save_txt or save_img: |
| print('Results saved to %s' % Path(out)) |
| if platform == 'darwin': |
| os.system('open ' + save_path) |
|
|
| print('Done. (%.3fs)' % (time.time() - t0)) |
| cv2.destroyAllWindows() |
| vid.release() |
|
|