import cv2 import tempfile import streamlit as st from ultralytics import YOLO, solutions import os import time from collections import defaultdict def main(): st.title('Licence Plate Recognition and Vehicle Counting') st.sidebar.title('Settings') st.markdown( """ """, unsafe_allow_html=True, ) enable_GPU = st.sidebar.checkbox('Enable GPU') custom_classes = st.sidebar.checkbox('Use Custom Classes') assigned_class_id = [] names = ['car', 'motorcycle', 'bus', 'train', 'truck', 'bike'] if custom_classes: assigned_class = st.sidebar.multiselect('Select The Custom Classes', list(names), default='bike') for each in assigned_class: assigned_class_id.append(names.index(each)) media_type = st.sidebar.radio("Choose media type", ('Video', 'Image')) # Load the YOLO models license_plate_model = YOLO('best24.pt') character_model = YOLO('best.pt') # replace with your character segmentation model def process_image(image): results = license_plate_model.predict(image) return results if media_type == 'Video': video_file_buffer = st.sidebar.file_uploader("Upload a video", type=["mp4", "mov", 'avi', 'asf', 'm4v']) DEMO_VIDEO = 'demo0.mp4' tffile = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) if not video_file_buffer: if os.path.exists(DEMO_VIDEO): tffile.name = DEMO_VIDEO vid = cv2.VideoCapture(DEMO_VIDEO) else: st.error(f"Demo video file not found: {DEMO_VIDEO}. Please upload a video.") return else: tffile.write(video_file_buffer.read()) tffile.close() # Ensure the file is closed before using it vid = cv2.VideoCapture(tffile.name) dem_vid = open(tffile.name, 'rb') demo_bytes = dem_vid.read() dem_vid.close() # Ensure the file is closed after reading stframe = st.empty() st.info('Input Video') # st.video(demo_bytes) st.sidebar.markdown('---') confidence = st.sidebar.slider('Confidence', min_value=0.0, max_value=1.0, value=0.25) st.sidebar.markdown('---') cap = cv2.VideoCapture(tffile.name) if not cap.isOpened(): st.error(f"Error reading video file: {tffile.name}") return w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS)) output_width = w output_height = h # Radio button to choose between vertical and horizontal line line_orientation = st.sidebar.radio('Line Orientation', ('Horizontal', 'Vertical')) # Slider for dynamic shift amount if line_orientation == 'Horizontal': vertical_shift = st.sidebar.slider('Shift Amount (Vertical Shift)', min_value=0, max_value=h, value=250, step=50) line_points = [(0, vertical_shift), (w, vertical_shift)] else: horizontal_shift = st.sidebar.slider('Shift Amount (Horizontal Shift)', min_value=0, max_value=w, value=200, step=50) line_points = [(horizontal_shift, 0), (horizontal_shift, h)] st.sidebar.markdown('---') # Display the initial frame with the line ret, frame = cap.read() if ret: cv2.line(frame, line_points[0], line_points[1], (255, 0, 0), 2) st.image(frame, channels="BGR") kpi1, kpi2, kpi3 = st.columns(3) with kpi1: st.markdown("**Frame Rate**") kpi1_text = st.markdown(fps) with kpi2: st.markdown("**Height**") kpi2_text = st.markdown(h) with kpi3: st.markdown("**Width**") kpi3_text = st.markdown(w) if st.button('Process Video'): classes_to_count = [0,1,2] output_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.avi').name video_writer = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (output_width, output_height)) # Init Object Counter counter = solutions.ObjectCounter( view_img=False, reg_pts=line_points, names=license_plate_model.names, draw_tracks=True, line_thickness=2, line_dist_thresh=50 ) while cap.isOpened(): success, im0 = cap.read() if not success: print("Video frame is empty or video processing has been successfully completed.") break tracks = license_plate_model.track(im0, persist=True, show=False, classes=classes_to_count) im0 = counter.start_counting(im0, tracks) video_writer.write(im0) cap.release() video_writer.release() cv2.destroyAllWindows() # Provide a download link for the processed video if os.path.exists(output_video_path): with open(output_video_path, 'rb') as f: st.download_button( label="Download Processed Video", data=f, file_name="processed_video.avi", mime="video/avi" ) else: st.error(f"Processed video file not found: {output_video_path}") time.sleep(1) # Wait for a second before attempting to delete the file def is_file_in_use(filepath): try: os.rename(filepath, filepath) return False except OSError: return True if not is_file_in_use(tffile.name): try: os.remove(tffile.name) except Exception as e: st.write(f"Error deleting temporary video file: {e}") if not is_file_in_use(output_video_path): try: os.remove(output_video_path) except Exception as e: st.write(f"Error deleting output video file: {e}") else: image_file_buffer = st.sidebar.file_uploader("Upload an image", type=["jpg", "jpeg", "png"]) if image_file_buffer is not None: tffile = tempfile.NamedTemporaryFile(delete=False) tffile.write(image_file_buffer.read()) tffile.close() # Ensure the file is closed before using it img = cv2.imread(tffile.name) st.text('Input Image') st.image(img, use_column_width=True) if st.button('Process Image'): results = process_image(img) license_plate_class_index = 1 # Update this if needed for your model detected_classes = [] # To store detected classes from character segmentation # Filter results for license plate class for result in results: filtered_boxes = [box for box in result.boxes if int(box.cls.item()) == license_plate_class_index] for box in filtered_boxes: try: cls = int(box.cls.item()) conf = float(box.conf.item()) label = f"{license_plate_model.names[cls]} {conf:.2f}" x1, y1, x2, y2 = map(int, box.xyxy[0].tolist()) cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2) cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) # Extract the license plate region, resize it, and pass it to the character model license_plate_region = img[y1:y2, x1:x2] zoomed_license_plate = cv2.resize(license_plate_region, (224, 224)) # Adjust size as needed char_results = character_model.predict(zoomed_license_plate) # Draw character boxes on the original image char_boxes = [] for char_result in char_results: for char_box in char_result.boxes: try: char_cls = int(char_box.cls.item()) char_conf = float(char_box.conf.item()) char_label = f"{character_model.names[char_cls]} {char_conf:.2f}" char_x1, char_y1, char_x2, char_y2 = map(int, char_box.xyxy[0].tolist()) # Adjust coordinates relative to the original image orig_char_x1 = x1 + char_x1 * ((x2 - x1) / 224) orig_char_y1 = y1 + char_y1 * ((y2 - y1) / 224) orig_char_x2 = x1 + char_x2 * ((x2 - x1) / 224) orig_char_y2 = y1 + char_y2 * ((y2 - y1) / 224) cv2.rectangle(img, (int(orig_char_x1), int(orig_char_y1)), (int(orig_char_x2), int(orig_char_y2)), (0, 255, 0), 2) cv2.putText(img, char_label, (int(orig_char_x1), int(orig_char_y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) detected_classes.append((character_model.names[char_cls], orig_char_x1, orig_char_y1)) char_boxes.append(((character_model.names[char_cls], orig_char_x1, orig_char_y1), (int(orig_char_x1), int(orig_char_y1), int(orig_char_x2), int(orig_char_y2)))) except Exception as e: st.write(f"Error processing character box: {e}") st.write(f"Character box data: {char_box}") except Exception as e: st.write(f"Error processing license plate box: {e}") st.write(f"License plate box data: {box}") st.image(img, caption='Processed Image', use_column_width=True) # Group detected classes by their y-coordinates grouped_classes = defaultdict(list) for cls, x, y in detected_classes: grouped_classes[int(y // 20)].append((cls, x)) # Adjust the divisor (20) as needed based on character height # Sort each group by x-coordinate for key in grouped_classes: grouped_classes[key].sort(key=lambda x: x[1]) # Print detected classes in order for each row st.sidebar.markdown("### Detected Character Classes (Ordered by Rows)") row_counter = 1 for key in sorted(grouped_classes.keys()): st.sidebar.write(f"Row {row_counter}:") c = "" for cls, _ in grouped_classes[key]: c += cls st.sidebar.write(c) row_counter += 1 # Check if any detected class belongs to Bagmati State if any("BA" in cls or "Bagmati" in cls for cls, _, _ in detected_classes): st.sidebar.write("It belongs to Bagmati State") time.sleep(1) # Wait for a second before attempting to delete the file def is_file_in_use(filepath): try: os.rename(filepath, filepath) return False except OSError: return True if not is_file_in_use(tffile.name): try: os.remove(tffile.name) except Exception as e: st.write(f"Error deleting temporary image file: {e}") if __name__ == '__main__': main()