License_plate / app.py
iamisha's picture
Upload 6 files
dd47838 verified
import cv2
import tempfile
import streamlit as st
from ultralytics import YOLO, solutions
import os
import time
from collections import defaultdict
def main():
st.title('Licence Plate Recognition and Vehicle Counting')
st.sidebar.title('Settings')
st.markdown(
"""
<style>
[data-testid="stSidebar"][aria-expanded="true"] > div:first-child{width: 350px;}
[data-testid="stSidebar"][aria-expanded="false"] > div:first-child{width: 350px; margin-left: -400px;}
</style>
""",
unsafe_allow_html=True,
)
enable_GPU = st.sidebar.checkbox('Enable GPU')
custom_classes = st.sidebar.checkbox('Use Custom Classes')
assigned_class_id = []
names = ['car', 'motorcycle', 'bus', 'train', 'truck', 'bike']
if custom_classes:
assigned_class = st.sidebar.multiselect('Select The Custom Classes', list(names), default='bike')
for each in assigned_class:
assigned_class_id.append(names.index(each))
media_type = st.sidebar.radio("Choose media type", ('Video', 'Image'))
# Load the YOLO models
license_plate_model = YOLO('best24.pt')
character_model = YOLO('best.pt') # replace with your character segmentation model
def process_image(image):
results = license_plate_model.predict(image)
return results
if media_type == 'Video':
video_file_buffer = st.sidebar.file_uploader("Upload a video", type=["mp4", "mov", 'avi', 'asf', 'm4v'])
DEMO_VIDEO = 'demo0.mp4'
tffile = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
if not video_file_buffer:
if os.path.exists(DEMO_VIDEO):
tffile.name = DEMO_VIDEO
vid = cv2.VideoCapture(DEMO_VIDEO)
else:
st.error(f"Demo video file not found: {DEMO_VIDEO}. Please upload a video.")
return
else:
tffile.write(video_file_buffer.read())
tffile.close() # Ensure the file is closed before using it
vid = cv2.VideoCapture(tffile.name)
dem_vid = open(tffile.name, 'rb')
demo_bytes = dem_vid.read()
dem_vid.close() # Ensure the file is closed after reading
stframe = st.empty()
st.info('Input Video')
# st.video(demo_bytes)
st.sidebar.markdown('---')
confidence = st.sidebar.slider('Confidence', min_value=0.0, max_value=1.0, value=0.25)
st.sidebar.markdown('---')
cap = cv2.VideoCapture(tffile.name)
if not cap.isOpened():
st.error(f"Error reading video file: {tffile.name}")
return
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))
output_width = w
output_height = h
# Radio button to choose between vertical and horizontal line
line_orientation = st.sidebar.radio('Line Orientation', ('Horizontal', 'Vertical'))
# Slider for dynamic shift amount
if line_orientation == 'Horizontal':
vertical_shift = st.sidebar.slider('Shift Amount (Vertical Shift)', min_value=0, max_value=h, value=250, step=50)
line_points = [(0, vertical_shift), (w, vertical_shift)]
else:
horizontal_shift = st.sidebar.slider('Shift Amount (Horizontal Shift)', min_value=0, max_value=w, value=200, step=50)
line_points = [(horizontal_shift, 0), (horizontal_shift, h)]
st.sidebar.markdown('---')
# Display the initial frame with the line
ret, frame = cap.read()
if ret:
cv2.line(frame, line_points[0], line_points[1], (255, 0, 0), 2)
st.image(frame, channels="BGR")
kpi1, kpi2, kpi3 = st.columns(3)
with kpi1:
st.markdown("**Frame Rate**")
kpi1_text = st.markdown(fps)
with kpi2:
st.markdown("**Height**")
kpi2_text = st.markdown(h)
with kpi3:
st.markdown("**Width**")
kpi3_text = st.markdown(w)
if st.button('Process Video'):
classes_to_count = [0,1,2]
output_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.avi').name
video_writer = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (output_width, output_height))
# Init Object Counter
counter = solutions.ObjectCounter(
view_img=False,
reg_pts=line_points,
names=license_plate_model.names,
draw_tracks=True,
line_thickness=2,
line_dist_thresh=50
)
while cap.isOpened():
success, im0 = cap.read()
if not success:
print("Video frame is empty or video processing has been successfully completed.")
break
tracks = license_plate_model.track(im0, persist=True, show=False, classes=classes_to_count)
im0 = counter.start_counting(im0, tracks)
video_writer.write(im0)
cap.release()
video_writer.release()
cv2.destroyAllWindows()
# Provide a download link for the processed video
if os.path.exists(output_video_path):
with open(output_video_path, 'rb') as f:
st.download_button(
label="Download Processed Video",
data=f,
file_name="processed_video.avi",
mime="video/avi"
)
else:
st.error(f"Processed video file not found: {output_video_path}")
time.sleep(1) # Wait for a second before attempting to delete the file
def is_file_in_use(filepath):
try:
os.rename(filepath, filepath)
return False
except OSError:
return True
if not is_file_in_use(tffile.name):
try:
os.remove(tffile.name)
except Exception as e:
st.write(f"Error deleting temporary video file: {e}")
if not is_file_in_use(output_video_path):
try:
os.remove(output_video_path)
except Exception as e:
st.write(f"Error deleting output video file: {e}")
else:
image_file_buffer = st.sidebar.file_uploader("Upload an image", type=["jpg", "jpeg", "png"])
if image_file_buffer is not None:
tffile = tempfile.NamedTemporaryFile(delete=False)
tffile.write(image_file_buffer.read())
tffile.close() # Ensure the file is closed before using it
img = cv2.imread(tffile.name)
st.text('Input Image')
st.image(img, use_column_width=True)
if st.button('Process Image'):
results = process_image(img)
license_plate_class_index = 1 # Update this if needed for your model
detected_classes = [] # To store detected classes from character segmentation
# Filter results for license plate class
for result in results:
filtered_boxes = [box for box in result.boxes if int(box.cls.item()) == license_plate_class_index]
for box in filtered_boxes:
try:
cls = int(box.cls.item())
conf = float(box.conf.item())
label = f"{license_plate_model.names[cls]} {conf:.2f}"
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
# Extract the license plate region, resize it, and pass it to the character model
license_plate_region = img[y1:y2, x1:x2]
zoomed_license_plate = cv2.resize(license_plate_region, (224, 224)) # Adjust size as needed
char_results = character_model.predict(zoomed_license_plate)
# Draw character boxes on the original image
char_boxes = []
for char_result in char_results:
for char_box in char_result.boxes:
try:
char_cls = int(char_box.cls.item())
char_conf = float(char_box.conf.item())
char_label = f"{character_model.names[char_cls]} {char_conf:.2f}"
char_x1, char_y1, char_x2, char_y2 = map(int, char_box.xyxy[0].tolist())
# Adjust coordinates relative to the original image
orig_char_x1 = x1 + char_x1 * ((x2 - x1) / 224)
orig_char_y1 = y1 + char_y1 * ((y2 - y1) / 224)
orig_char_x2 = x1 + char_x2 * ((x2 - x1) / 224)
orig_char_y2 = y1 + char_y2 * ((y2 - y1) / 224)
cv2.rectangle(img, (int(orig_char_x1), int(orig_char_y1)), (int(orig_char_x2), int(orig_char_y2)), (0, 255, 0), 2)
cv2.putText(img, char_label, (int(orig_char_x1), int(orig_char_y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
detected_classes.append((character_model.names[char_cls], orig_char_x1, orig_char_y1))
char_boxes.append(((character_model.names[char_cls], orig_char_x1, orig_char_y1), (int(orig_char_x1), int(orig_char_y1), int(orig_char_x2), int(orig_char_y2))))
except Exception as e:
st.write(f"Error processing character box: {e}")
st.write(f"Character box data: {char_box}")
except Exception as e:
st.write(f"Error processing license plate box: {e}")
st.write(f"License plate box data: {box}")
st.image(img, caption='Processed Image', use_column_width=True)
# Group detected classes by their y-coordinates
grouped_classes = defaultdict(list)
for cls, x, y in detected_classes:
grouped_classes[int(y // 20)].append((cls, x)) # Adjust the divisor (20) as needed based on character height
# Sort each group by x-coordinate
for key in grouped_classes:
grouped_classes[key].sort(key=lambda x: x[1])
# Print detected classes in order for each row
st.sidebar.markdown("### Detected Character Classes (Ordered by Rows)")
row_counter = 1
for key in sorted(grouped_classes.keys()):
st.sidebar.write(f"Row {row_counter}:")
c = ""
for cls, _ in grouped_classes[key]:
c += cls
st.sidebar.write(c)
row_counter += 1
# Check if any detected class belongs to Bagmati State
if any("BA" in cls or "Bagmati" in cls for cls, _, _ in detected_classes):
st.sidebar.write("It belongs to Bagmati State")
time.sleep(1) # Wait for a second before attempting to delete the file
def is_file_in_use(filepath):
try:
os.rename(filepath, filepath)
return False
except OSError:
return True
if not is_file_in_use(tffile.name):
try:
os.remove(tffile.name)
except Exception as e:
st.write(f"Error deleting temporary image file: {e}")
if __name__ == '__main__':
main()