| import os |
| import streamlit as st |
| import firebase_admin |
| from firebase_admin import credentials, db, auth, firestore |
| import face_recognition |
| from PIL import Image |
| import numpy as np |
| import cv2 |
| import dlib |
| from io import BytesIO |
| import requests |
|
|
| |
| current_directory = os.path.dirname(os.path.abspath(__file__)) |
| skp_path = os.path.join(current_directory, "projectinsta-s-firebase-adminsdk-y6vlu-9a1345f468.json") |
|
|
| |
| if not firebase_admin._apps: |
| |
| cred = credentials.Certificate(skp_path) |
| firebase_admin.initialize_app(cred, { |
| 'databaseURL': 'https://projectinsta-s-default-rtdb.firebaseio.com/', |
| 'projectId': 'projectinsta-s' |
| }) |
|
|
| |
| ref = db.reference('/') |
|
|
| |
| db_firestore = firestore.client() |
|
|
| |
| if "auth_state" not in st.session_state: |
| st.session_state.auth_state = { |
| "user": None, |
| "signed_in": False, |
| } |
|
|
| |
| shape_predictor_path = "shape_predictor_68_face_landmarks.dat" |
| detector = dlib.get_frontal_face_detector() |
| shape_predictor = dlib.shape_predictor(shape_predictor_path) |
|
|
| |
| def authenticate_user(email, password): |
| try: |
| user = auth.get_user_by_email(email) |
| |
| return True, user |
| except auth.AuthError as e: |
| print(f"Authentication error: {str(e)}") |
| return False, None |
|
|
| |
| def create_user(email, password): |
| try: |
| user = auth.create_user( |
| email=email, |
| password=password |
| ) |
| return True, user.uid |
| except Exception as e: |
| print(f"User creation error: {str(e)}") |
| return False, None |
| |
| |
| def load_and_encode(image_file): |
| try: |
| |
| image_bytes = image_file.read() |
|
|
| |
| nparr = np.frombuffer(image_bytes, np.uint8) |
|
|
| |
| image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
|
|
| aligned_faces = detect_and_align_faces(image) |
|
|
| if aligned_faces is not None: |
| encodings = [] |
| for aligned_face in aligned_faces: |
| encoding = face_recognition.face_encodings(aligned_face) |
| if encoding: |
| encodings.append(encoding[0]) |
| |
| if encodings: |
| return encodings |
| else: |
| return None |
| else: |
| return None |
| except Exception as e: |
| print(f"Error loading and encoding image: {str(e)}") |
| return None |
|
|
| |
| def detect_and_align_faces(image): |
| |
| target_width = 800 |
| aspect_ratio = image.shape[1] / image.shape[0] |
| target_height = int(target_width / aspect_ratio) |
| resized_image = cv2.resize(image, (target_width, target_height)) |
|
|
| gray = cv2.cvtColor(resized_image, cv2.COLOR_BGR2GRAY) |
| |
| |
| faces = detector(gray) |
|
|
| if not faces: |
| return None |
|
|
| aligned_faces = [] |
| for face in faces: |
| |
| landmarks = shape_predictor(gray, face) |
| aligned_face = dlib.get_face_chip(resized_image, landmarks, size=256) |
| aligned_faces.append(aligned_face) |
| |
| return aligned_faces |
|
|
| |
| def add_person(name, image_path, instagram_handle, email=None): |
| try: |
| encoding = load_and_encode(image_path) |
| if not encoding: |
| return "No face found in the provided image." |
|
|
| |
| encoding = encoding[0].tolist() |
|
|
| |
| person_data = { |
| "encoding": encoding, |
| "info": { |
| "instagram_handle": instagram_handle, |
| "instagram_link": f"https://www.instagram.com/{instagram_handle}/" |
| }, |
| "added_by": st.session_state.auth_state["user"].email |
| } |
| if email: |
| person_data["info"]["email"] = email |
|
|
| ref.child(name).set(person_data) |
|
|
| log_action(st.session_state.auth_state["user"].email, "Added person") |
| return f"Success: {name} added to the database!" |
| except Exception as e: |
| return f"Failed to add person: {str(e)}" |
|
|
| |
| def recognize_face(image_path): |
| if not image_path: |
| return "Please upload an image." |
|
|
| try: |
| |
| unknown_encodings = load_and_encode(image_path) |
| if not unknown_encodings: |
| return "No face found in the provided image." |
|
|
| matches = [] |
| for unknown_encoding in unknown_encodings: |
| face_matches = [] |
| for name, data in ref.get().items(): |
| known_encoding = np.array(data["encoding"]) |
| if face_recognition.compare_faces([known_encoding], unknown_encoding)[0]: |
| info = data["info"] |
| instagram_handle = info.get("instagram_handle") |
| email = info.get("email", "Email not provided") |
| |
| |
| if instagram_handle: |
| insta_data = fetch_instagram_data(instagram_handle) |
| if insta_data: |
| edge_followed_by = insta_data.get('edge_followed_by', {}).get('count') |
| edge_follow = insta_data.get('edge_follow', {}).get('count') |
| full_name = insta_data.get('full_name') |
| biography = insta_data.get('biography') |
| is_private = insta_data.get('is_private') |
| profile_pic_url_hd = insta_data.get('profile_pic_url_hd') |
| face_matches.append((name, instagram_handle, email, edge_followed_by, edge_follow, full_name, biography, is_private, profile_pic_url_hd)) |
| else: |
| face_matches.append((name, instagram_handle, email, "N/A", "N/A", "Unknown", "Unknown", "N/A", "N/A")) |
| else: |
| face_matches.append((name, "Unknown", email, "N/A", "N/A", "Unknown", "Unknown", "N/A", "N/A")) |
|
|
| if face_matches: |
| matches.extend(face_matches) |
| else: |
| matches.append(("Unknown", "Unknown", "Unknown", "N/A", "N/A", "Unknown", "Unknown", "N/A", "N/A")) |
|
|
| if matches: |
| results = [] |
| for name, insta_handle, email, edge_followed_by, edge_follow, full_name, biography, is_private, profile_pic_url_hd in matches: |
| insta_link = f"https://www.instagram.com/{insta_handle}/" |
| insta_link_html = f'<a href="{insta_link}" target="_blank"><font color="red">{insta_handle}</font></a>' |
| results.append(f"- It's a picture of {name}! Insta handle: {insta_link_html}, Email: {email}, Followers: {edge_followed_by}, Following: {edge_follow}, Full Name: {full_name}, Biography: {biography}, Private: {is_private}, Profile Picture: <img src='{profile_pic_url_hd}' width='50'>") |
| log_action(st.session_state.auth_state["user"].email, "Recognized face") |
| return "\n".join(results) |
| else: |
| return "Face not found in the database." |
| except Exception as e: |
| return f"Failed to recognize face: {str(e)}" |
|
|
| def fetch_instagram_data(instagram_handle): |
| url = f"https://instagram-scraper-20231.p.rapidapi.com/userinfo/{instagram_handle}" |
| headers = { |
| "x-rapidapi-key": "f47a0f4918msha715fc855e591a2p170a28jsnfc038903d424", |
| "x-rapidapi-host": "instagram-scraper-20231.p.rapidapi.com" |
| } |
|
|
| try: |
| response = requests.get(url, headers=headers) |
| if response.status_code == 200: |
| data = response.json() |
| if data.get('status') == 'success': |
| return data.get('data', {}) |
| else: |
| return None |
| else: |
| return None |
| except Exception as e: |
| print(f"Error fetching Instagram data: {str(e)}") |
| return None |
|
|
| |
| |
| def recognize_face_optimal(image_path): |
| if not image_path: |
| return "Please upload an image." |
|
|
| try: |
| unknown_encodings = load_and_encode(image_path) |
| if not unknown_encodings: |
| return "No face found in the provided image." |
|
|
| matches = [] |
| for unknown_encoding in unknown_encodings: |
| face_matches = [] |
| for name, data in ref.get().items(): |
| known_encoding = np.array(data["encoding"]) |
| similarity_score = face_recognition.face_distance([known_encoding], unknown_encoding)[0] |
| if similarity_score > 0.50: |
| continue |
| face_matches.append((name, similarity_score)) |
|
|
| if face_matches: |
| best_match = min(face_matches, key=lambda x: x[1]) |
| best_name, best_score = best_match |
| info = ref.child(best_name).child("info").get() |
| insta_handle = info["instagram_handle"] |
| insta_link = info["instagram_link"] |
| insta_link_html = f'<a href="{insta_link}" target="_blank"><font color="red">{insta_handle}</font></a>' |
| matches.append(f"Best match: {best_name} with a similarity score of {1 - best_score:.2%}. Insta handle: {insta_link_html}") |
| else: |
| matches.append("Face not found in the database.") |
|
|
| return "\n".join(matches) |
| except Exception as e: |
| return f"Failed to recognize face: {str(e)}" |
| |
| |
| def delete_person(name): |
| try: |
| ref.child(name).delete() |
| log_action(st.session_state.auth_state["user"].email, "Deleted person") |
| return f"{name} deleted from the database!" |
| except Exception as e: |
| return f"Failed to delete person: {str(e)}" |
|
|
| |
| def delete_user(email, password): |
| |
| try: |
| user = auth.get_user_by_email(email) |
| if user: |
| |
| auth.delete_user(user.uid) |
| return "User deleted successfully!" |
| except Exception as e: |
| return f"Failed to delete user: {str(e)}" |
|
|
| |
| def send_feedback(feedback_data): |
| try: |
| db_firestore.collection('feedback').add(feedback_data) |
| except Exception as e: |
| st.error(f"Failed to submit feedback: {str(e)}") |
|
|
| |
| def add_person_ui(): |
| st.title("😎 Add Person") |
| name = st.text_input("Enter Name", help="Enter the name of the person") |
| image_path = st.file_uploader("Upload Image", help="Upload an image containing the person's face") |
| email = st.text_input("Enter Email (Optional)", help="Enter the person's email address (optional)") |
| instagram_handle = st.text_input("Enter Instagram Handle", help="Enter the person's Instagram handle") |
| if st.button("Add Person"): |
| if not name or not image_path or not instagram_handle: |
| st.error("Please fill all the required fields.") |
| else: |
| result = add_person(name, image_path, instagram_handle, email) |
| st.success(result) |
|
|
| |
| def recognize_face_ui(): |
| st.title("🔍 Recognize Face") |
| image_path = st.file_uploader("Upload Image", help="Upload an image for face recognition") |
| if st.button("Recognize Face"): |
| result = recognize_face(image_path) |
| st.write(result, unsafe_allow_html=True) |
|
|
| def recognize_face_optimal_ui(): |
| st.title("🔍 Recognize Face (Optimal)") |
| image_path = st.file_uploader("Upload Image", help="Upload an image for optimal face recognition") |
| if st.button("Recognize Face (Optimal)"): |
| result = recognize_face_optimal(image_path) |
| st.write(result, unsafe_allow_html=True) |
| |
| |
| def delete_person_ui(): |
| st.title("🗑️ Delete Person") |
| name = st.text_input("Enter Name", help="Enter the name of the person to delete") |
| if st.button("Delete Person"): |
| if not name: |
| st.error("Please enter a name.") |
| else: |
| |
| if name not in ref.get().keys(): |
| st.error("Person not found in the database.") |
| return |
|
|
| |
| person_data = ref.child(name).get() |
| if person_data["added_by"] != st.session_state.auth_state["user"].email: |
| st.error("You can only delete the person added by you.") |
| return |
|
|
| result = delete_person(name) |
| st.success(result) |
|
|
| |
| def feedback_ui(): |
| st.title("✍️ Feedback") |
| st.write("Your feedback is important to us! Please fill out the form below:") |
|
|
| name = st.text_input("Name (optional)") |
| email = st.text_input("Email (optional)") |
| category = st.selectbox("Category", ["Bug Report", "Feature Request", "General Feedback"]) |
| message = st.text_area("Feedback Message") |
|
|
| if st.button("Submit Feedback"): |
| if not message: |
| st.error("Please enter your feedback message.") |
| else: |
| feedback_data = { |
| "name": name, |
| "email": email, |
| "category": category, |
| "message": message, |
| } |
| send_feedback(feedback_data) |
| st.success("Feedback submitted successfully! Thank you for your feedback.") |
|
|
| |
|
|
| |
| def send_message(sender_email, receiver_email, message_content): |
| try: |
| |
| db_firestore.collection('messages').add({ |
| 'sender_email': sender_email, |
| 'receiver_email': receiver_email, |
| 'message_content': message_content, |
| 'timestamp': firestore.SERVER_TIMESTAMP |
| }) |
| return "Message sent successfully!" |
| except Exception as e: |
| return f"Failed to send message: {str(e)}" |
|
|
| |
| def get_messages(user_email): |
| try: |
| messages = db_firestore.collection('messages').where('receiver_email', '==', user_email).order_by('timestamp', direction=firestore.Query.DESCENDING).stream() |
| return messages |
| except Exception as e: |
| return None |
|
|
| |
| def messaging_ui(): |
| st.title("💬 Messaging") |
|
|
| if st.session_state.auth_state["signed_in"]: |
| sender_email = st.session_state.auth_state["user"].email |
| receiver_email = st.text_input("Receiver's Email", help="Enter the receiver's email address") |
| message_content = st.text_area("Message Content") |
|
|
| if st.button("Send Message"): |
| result = send_message(sender_email, receiver_email, message_content) |
| st.write(result) |
|
|
| messages = get_messages(sender_email) |
| if messages: |
| st.write("Your messages:") |
| for message in messages: |
| message_data = message.to_dict() |
| st.write(f"From: {message_data['sender_email']}") |
| st.write(f"Message: {message_data['message_content']}") |
| st.write("---") |
| else: |
| st.write("Please sign in to send and view messages.") |
| |
| |
|
|
| |
| def log_action(user_email, action): |
| try: |
| db_firestore.collection('history').add({ |
| 'user_email': user_email, |
| 'action': action, |
| 'timestamp': firestore.SERVER_TIMESTAMP |
| }) |
| except Exception as e: |
| st.error(f"Failed to log action: {str(e)}") |
|
|
| |
| def display_history(user_email): |
| st.title("📜 History") |
| st.write("Here is the history of actions taken by you:") |
|
|
| try: |
| history = db_firestore.collection('history').where('user_email', '==', user_email).order_by('timestamp', direction=firestore.Query.DESCENDING).stream() |
| for entry in history: |
| entry_data = entry.to_dict() |
| action = entry_data['action'] |
| timestamp = entry_data['timestamp'] |
| st.write(f"- {action} at {timestamp}") |
| except Exception as e: |
| st.error(f"Failed to retrieve history: {str(e)}") |
| |
|
|
| |
| def delete_user_ui(): |
| st.title("Delete User") |
| email = st.text_input("Enter User's Email", help="Enter the email of the user to delete") |
| password = st.text_input("Enter Your Password", type="password", help="Enter your password for confirmation") |
| if st.button("Delete User"): |
| if not email or not password: |
| st.error("Please enter the user's email and your password.") |
| else: |
| |
| confirmation = st.checkbox("I confirm that I want to delete this user.") |
| if confirmation: |
| result = delete_user(email, password) |
| st.success(result) |
|
|
| def tour_guide_ui(): |
| st.title("🗺️ Tour Guide") |
| st.markdown("This tour will guide you through the application.") |
|
|
| for step in steps: |
| with st.expander(step["title"]): |
| st.write(step["content"]) |
|
|
| def authenticate_user_ui(): |
| |
| c30, c31, c32 = st.columns([0.2, 0.1, 3]) |
| with c30: |
| st.caption("") |
| |
| logo_path = os.path.join(current_directory, "Explore+.png") |
| logo = Image.open(logo_path) |
| st.image(logo, width=60) |
|
|
| with c32: |
| st.title("Insta's EYE") |
| st.sidebar.title("Options") |
|
|
| if st.session_state.auth_state["signed_in"]: |
| st.sidebar.button("Sign Out", on_click=logout) |
| st.title("Welcome!") |
| main() |
| else: |
| option = st.sidebar.radio("Select Option", ["Login", "Sign-Up"]) |
|
|
| email = st.text_input("Enter Email", help="Enter your email address") |
| password = st.text_input("Enter Password", type="password", help="Enter your password") |
|
|
| if option == "Login": |
| if st.button("Login"): |
| if not email or not password: |
| st.error("Please enter both email and password.") |
| else: |
| success, user = authenticate_user(email, password) |
| if success: |
| st.session_state.auth_state["user"] = user |
| st.session_state.auth_state["signed_in"] = True |
| st.success("Authentication successful! You can now manage your set of images and profiles.") |
| main() |
| else: |
| st.error("Authentication failed. Please check your email and password.") |
|
|
| elif option == "Sign-Up": |
| confirm_password = st.text_input("Confirm Password", type="password", help="Re-enter your password for confirmation") |
| if st.button("Sign-Up"): |
| if not email or not password or not confirm_password: |
| st.error("Please fill all the fields.") |
| elif password != confirm_password: |
| st.error("Passwords do not match.") |
| else: |
| success, uid = create_user(email, password) |
| if success: |
| st.success(f"User with UID: {uid} created successfully! You can now log in.") |
| else: |
| st.error("User creation failed. Please try again.") |
|
|
| |
| def logout(): |
| st.session_state.auth_state["user"] = None |
| st.session_state.auth_state["signed_in"] = False |
|
|
| |
| steps = [ |
| { |
| "title": "Welcome to Insta's EYE", |
| "content": "This is a tour guide to help you navigate through the application.", |
| }, |
| { |
| "title": "Options Sidebar", |
| "content": "Here you can select different options such as adding a person, recognizing a face, deleting a person, or recognizing a face with optimal identification.", |
| }, |
| { |
| "title": "Recognize face", |
| "content": "This function will return you Id's for your provided face.", |
| }, |
| { |
| "title": "Recognize face(optimal)", |
| "content": "This function will return you Id's of provided face with more accuracy", |
| }, |
| { |
| "title": "Add person", |
| "content": "Here you can add yourself or someone else too", |
| }, |
| { |
| "title": "Delete person", |
| "content": "Here you can delete the person by entering their name", |
| }, |
| { |
| "title": "History", |
| "content": "Here you can track history of your activities", |
| }, |
| { |
| "title": "Upload Image", |
| "content": "You can upload an image here for face recognition or adding a person.", |
| }, |
| { |
| "title": "Text Input", |
| "content": "Enter text here such as the person's name or Instagram handle.", |
| }, |
| { |
| "title": "Buttons", |
| "content": "Click on these buttons to perform actions like adding a person or recognizing a face.", |
| }, |
| ] |
| |
| |
| def display_tour_steps(steps): |
| st.markdown("# Tour Guide") |
| st.markdown("This tour will guide you through the application.") |
| st.markdown("---") |
|
|
| for step in steps: |
| st.markdown(f"## {step['title']}") |
| st.write(step['content']) |
| st.markdown("---") |
| |
| |
| def main(): |
| st.sidebar.title("Options") |
| option = st.sidebar.radio("Select Option", ["Recognize Face", "Add Person", "Recognize Face (Optimal)", "Delete Person", "Tour Guide", "Feedback", "Messaging", "History", "Delete User"]) |
| |
| if option == "Recognize Face": |
| recognize_face_ui() |
| elif option == "Recognize Face (Optimal)": |
| recognize_face_optimal_ui() |
| elif option == "Add Person": |
| add_person_ui() |
| elif option == "Delete Person": |
| delete_person_ui() |
| elif option == "Tour Guide": |
| tour_guide_ui() |
| elif option == "Feedback": |
| feedback_ui() |
| elif option == "Messaging": |
| messaging_ui() |
| elif option == "History": |
| display_history(st.session_state.auth_state["user"].email) |
| elif option == "Delete User": |
| delete_user_ui() |
| |
| |
| if __name__ == "__main__": |
| authenticate_user_ui() |