Spaces:
Sleeping
Sleeping
| import cv2 | |
| import tempfile | |
| import streamlit as st | |
| from ultralytics import YOLO, solutions | |
| import os | |
| import time | |
| from collections import defaultdict | |
| def main(): | |
| st.title('Licence Plate Recognition and Vehicle Counting') | |
| st.sidebar.title('Settings') | |
| st.markdown( | |
| """ | |
| <style> | |
| [data-testid="stSidebar"][aria-expanded="true"] > div:first-child{width: 350px;} | |
| [data-testid="stSidebar"][aria-expanded="false"] > div:first-child{width: 350px; margin-left: -400px;} | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| enable_GPU = st.sidebar.checkbox('Enable GPU') | |
| custom_classes = st.sidebar.checkbox('Use Custom Classes') | |
| assigned_class_id = [] | |
| names = ['car', 'motorcycle', 'bus', 'train', 'truck', 'bike'] | |
| if custom_classes: | |
| assigned_class = st.sidebar.multiselect('Select The Custom Classes', list(names), default='bike') | |
| for each in assigned_class: | |
| assigned_class_id.append(names.index(each)) | |
| media_type = st.sidebar.radio("Choose media type", ('Video', 'Image')) | |
| # Load the YOLO models | |
| license_plate_model = YOLO('best24.pt') | |
| character_model = YOLO('best.pt') # replace with your character segmentation model | |
| def process_image(image): | |
| results = license_plate_model.predict(image) | |
| return results | |
| if media_type == 'Video': | |
| video_file_buffer = st.sidebar.file_uploader("Upload a video", type=["mp4", "mov", 'avi', 'asf', 'm4v']) | |
| DEMO_VIDEO = 'demo0.mp4' | |
| tffile = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) | |
| if not video_file_buffer: | |
| if os.path.exists(DEMO_VIDEO): | |
| tffile.name = DEMO_VIDEO | |
| vid = cv2.VideoCapture(DEMO_VIDEO) | |
| else: | |
| st.error(f"Demo video file not found: {DEMO_VIDEO}. Please upload a video.") | |
| return | |
| else: | |
| tffile.write(video_file_buffer.read()) | |
| tffile.close() # Ensure the file is closed before using it | |
| vid = cv2.VideoCapture(tffile.name) | |
| dem_vid = open(tffile.name, 'rb') | |
| demo_bytes = dem_vid.read() | |
| dem_vid.close() # Ensure the file is closed after reading | |
| stframe = st.empty() | |
| st.info('Input Video') | |
| # st.video(demo_bytes) | |
| st.sidebar.markdown('---') | |
| confidence = st.sidebar.slider('Confidence', min_value=0.0, max_value=1.0, value=0.25) | |
| st.sidebar.markdown('---') | |
| cap = cv2.VideoCapture(tffile.name) | |
| if not cap.isOpened(): | |
| st.error(f"Error reading video file: {tffile.name}") | |
| return | |
| w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS)) | |
| output_width = w | |
| output_height = h | |
| # Radio button to choose between vertical and horizontal line | |
| line_orientation = st.sidebar.radio('Line Orientation', ('Horizontal', 'Vertical')) | |
| # Slider for dynamic shift amount | |
| if line_orientation == 'Horizontal': | |
| vertical_shift = st.sidebar.slider('Shift Amount (Vertical Shift)', min_value=0, max_value=h, value=250, step=50) | |
| line_points = [(0, vertical_shift), (w, vertical_shift)] | |
| else: | |
| horizontal_shift = st.sidebar.slider('Shift Amount (Horizontal Shift)', min_value=0, max_value=w, value=200, step=50) | |
| line_points = [(horizontal_shift, 0), (horizontal_shift, h)] | |
| st.sidebar.markdown('---') | |
| # Display the initial frame with the line | |
| ret, frame = cap.read() | |
| if ret: | |
| cv2.line(frame, line_points[0], line_points[1], (255, 0, 0), 2) | |
| st.image(frame, channels="BGR") | |
| kpi1, kpi2, kpi3 = st.columns(3) | |
| with kpi1: | |
| st.markdown("**Frame Rate**") | |
| kpi1_text = st.markdown(fps) | |
| with kpi2: | |
| st.markdown("**Height**") | |
| kpi2_text = st.markdown(h) | |
| with kpi3: | |
| st.markdown("**Width**") | |
| kpi3_text = st.markdown(w) | |
| if st.button('Process Video'): | |
| classes_to_count = [0,1,2] | |
| output_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.avi').name | |
| video_writer = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (output_width, output_height)) | |
| # Init Object Counter | |
| counter = solutions.ObjectCounter( | |
| view_img=False, | |
| reg_pts=line_points, | |
| names=license_plate_model.names, | |
| draw_tracks=True, | |
| line_thickness=2, | |
| line_dist_thresh=50 | |
| ) | |
| while cap.isOpened(): | |
| success, im0 = cap.read() | |
| if not success: | |
| print("Video frame is empty or video processing has been successfully completed.") | |
| break | |
| tracks = license_plate_model.track(im0, persist=True, show=False, classes=classes_to_count) | |
| im0 = counter.start_counting(im0, tracks) | |
| video_writer.write(im0) | |
| cap.release() | |
| video_writer.release() | |
| cv2.destroyAllWindows() | |
| # Provide a download link for the processed video | |
| if os.path.exists(output_video_path): | |
| with open(output_video_path, 'rb') as f: | |
| st.download_button( | |
| label="Download Processed Video", | |
| data=f, | |
| file_name="processed_video.avi", | |
| mime="video/avi" | |
| ) | |
| else: | |
| st.error(f"Processed video file not found: {output_video_path}") | |
| time.sleep(1) # Wait for a second before attempting to delete the file | |
| def is_file_in_use(filepath): | |
| try: | |
| os.rename(filepath, filepath) | |
| return False | |
| except OSError: | |
| return True | |
| if not is_file_in_use(tffile.name): | |
| try: | |
| os.remove(tffile.name) | |
| except Exception as e: | |
| st.write(f"Error deleting temporary video file: {e}") | |
| if not is_file_in_use(output_video_path): | |
| try: | |
| os.remove(output_video_path) | |
| except Exception as e: | |
| st.write(f"Error deleting output video file: {e}") | |
| else: | |
| image_file_buffer = st.sidebar.file_uploader("Upload an image", type=["jpg", "jpeg", "png"]) | |
| if image_file_buffer is not None: | |
| tffile = tempfile.NamedTemporaryFile(delete=False) | |
| tffile.write(image_file_buffer.read()) | |
| tffile.close() # Ensure the file is closed before using it | |
| img = cv2.imread(tffile.name) | |
| st.text('Input Image') | |
| st.image(img, use_column_width=True) | |
| if st.button('Process Image'): | |
| results = process_image(img) | |
| license_plate_class_index = 1 # Update this if needed for your model | |
| detected_classes = [] # To store detected classes from character segmentation | |
| # Filter results for license plate class | |
| for result in results: | |
| filtered_boxes = [box for box in result.boxes if int(box.cls.item()) == license_plate_class_index] | |
| for box in filtered_boxes: | |
| try: | |
| cls = int(box.cls.item()) | |
| conf = float(box.conf.item()) | |
| label = f"{license_plate_model.names[cls]} {conf:.2f}" | |
| x1, y1, x2, y2 = map(int, box.xyxy[0].tolist()) | |
| cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2) | |
| cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) | |
| # Extract the license plate region, resize it, and pass it to the character model | |
| license_plate_region = img[y1:y2, x1:x2] | |
| zoomed_license_plate = cv2.resize(license_plate_region, (224, 224)) # Adjust size as needed | |
| char_results = character_model.predict(zoomed_license_plate) | |
| # Draw character boxes on the original image | |
| char_boxes = [] | |
| for char_result in char_results: | |
| for char_box in char_result.boxes: | |
| try: | |
| char_cls = int(char_box.cls.item()) | |
| char_conf = float(char_box.conf.item()) | |
| char_label = f"{character_model.names[char_cls]} {char_conf:.2f}" | |
| char_x1, char_y1, char_x2, char_y2 = map(int, char_box.xyxy[0].tolist()) | |
| # Adjust coordinates relative to the original image | |
| orig_char_x1 = x1 + char_x1 * ((x2 - x1) / 224) | |
| orig_char_y1 = y1 + char_y1 * ((y2 - y1) / 224) | |
| orig_char_x2 = x1 + char_x2 * ((x2 - x1) / 224) | |
| orig_char_y2 = y1 + char_y2 * ((y2 - y1) / 224) | |
| cv2.rectangle(img, (int(orig_char_x1), int(orig_char_y1)), (int(orig_char_x2), int(orig_char_y2)), (0, 255, 0), 2) | |
| cv2.putText(img, char_label, (int(orig_char_x1), int(orig_char_y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) | |
| detected_classes.append((character_model.names[char_cls], orig_char_x1, orig_char_y1)) | |
| char_boxes.append(((character_model.names[char_cls], orig_char_x1, orig_char_y1), (int(orig_char_x1), int(orig_char_y1), int(orig_char_x2), int(orig_char_y2)))) | |
| except Exception as e: | |
| st.write(f"Error processing character box: {e}") | |
| st.write(f"Character box data: {char_box}") | |
| except Exception as e: | |
| st.write(f"Error processing license plate box: {e}") | |
| st.write(f"License plate box data: {box}") | |
| st.image(img, caption='Processed Image', use_column_width=True) | |
| # Group detected classes by their y-coordinates | |
| grouped_classes = defaultdict(list) | |
| for cls, x, y in detected_classes: | |
| grouped_classes[int(y // 20)].append((cls, x)) # Adjust the divisor (20) as needed based on character height | |
| # Sort each group by x-coordinate | |
| for key in grouped_classes: | |
| grouped_classes[key].sort(key=lambda x: x[1]) | |
| # Print detected classes in order for each row | |
| st.sidebar.markdown("### Detected Character Classes (Ordered by Rows)") | |
| row_counter = 1 | |
| for key in sorted(grouped_classes.keys()): | |
| st.sidebar.write(f"Row {row_counter}:") | |
| c = "" | |
| for cls, _ in grouped_classes[key]: | |
| c += cls | |
| st.sidebar.write(c) | |
| row_counter += 1 | |
| # Check if any detected class belongs to Bagmati State | |
| if any("BA" in cls or "Bagmati" in cls for cls, _, _ in detected_classes): | |
| st.sidebar.write("It belongs to Bagmati State") | |
| time.sleep(1) # Wait for a second before attempting to delete the file | |
| def is_file_in_use(filepath): | |
| try: | |
| os.rename(filepath, filepath) | |
| return False | |
| except OSError: | |
| return True | |
| if not is_file_in_use(tffile.name): | |
| try: | |
| os.remove(tffile.name) | |
| except Exception as e: | |
| st.write(f"Error deleting temporary image file: {e}") | |
| if __name__ == '__main__': | |
| main() | |