| |
| """app.ipynb |
| |
| Automatically generated by Colab. |
| |
| Original file is located at |
| https://colab.research.google.com/drive/1e0kcqimC-jHT2w8MwJMSXxzS-wJh1jYS |
| """ |
|
|
| import json |
| import math |
| import os |
| import random |
| import heapq |
| import re |
| import sqlite3 |
| import gradio as gr |
| import numpy as np |
| import matplotlib.pyplot as plt |
| from scipy.ndimage import label, distance_transform_edt |
| from gtts import gTTS |
| from pydub import AudioSegment |
| import speech_recognition as sr |
| import openai |
|
|
| CATEGORIES_FILE = "categories.json" |
| PRODUCTS_FILE = "sort_data.json" |
| DB_PATH = "mall.db" |
| |
| |
|
|
|
|
| |
| |
| |
| CELL_SIZE = 0.2 |
| MEMORY_FILE = "navigation_memory.json" |
| GRID_0_FILE = "floor_0_grid.npy" |
| GRID_1_FILE = "floor_0_grid.npy" |
|
|
| OPENROUTER_KEY = "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" |
| OPENROUTER_MODEL = "tencent/hy3-preview:free" |
|
|
| def load_json_file(path, default): |
| try: |
| with open(path, "r", encoding="utf-8") as f: |
| return json.load(f) |
| except Exception: |
| return default |
|
|
| categories_data = load_json_file(CATEGORIES_FILE, []) |
| products_data = load_json_file(PRODUCTS_FILE, []) |
|
|
| |
| |
| |
| ROOM_INFO = { |
| 350: "Smart Devices Hub", |
| 351: "Gaming & Accessories Hub", |
| 352: "Computer Systems Hub", |
| 353: "Mobile & Tablets Hub", |
| 354: "Health & Personal Care", |
| 355: "Bedroom", |
| 356: "Living Room", |
| 357: "Study/Office", |
| 358: "Kitchen", |
| 450: "Dining Room", |
| 451: "Bathroom", |
| 452: "Furnishings", |
| 453: "Kitchen and Dining", |
| 454: "Home Decor", |
| 455: "Tools and utility", |
| 456: "Lighting and Electricals", |
| 457: "Cleaning and Bath", |
| 458: "Pet and Gardening", |
| } |
|
|
| STORE_CLUSTER_ROOM = { |
| "Smart Devices Hub": 350, |
| "Gaming & Accessories Hub": 351, |
| "Computer Systems Hub": 352, |
| "Mobile & Tablets Hub": 353, |
| "Health & Personal Care": 354, |
| "Bedroom": 355, |
| "Living Room": 356, |
| "Study/Office": 357, |
| "Kitchen": 358, |
| "Dining Room": 450, |
| "Bathroom": 451, |
| "Furnishings": 452, |
| "Kitchen and Dining": 453, |
| "Home Decor": 454, |
| "Tools and utility": 455, |
| "Lighting and Electricals": 456, |
| "Cleaning and Bath": 457, |
| "Pet and Gardening": 458, |
| } |
|
|
| STORE_CLUSTER = { |
| "Audio": "Smart Devices Hub", |
| "Cameras": "Smart Devices Hub", |
| "Smart Home Automation": "Smart Devices Hub", |
| "Smart Wearables": "Smart Devices Hub", |
| "Gaming": "Gaming & Accessories Hub", |
| "Laptop Accessories": "Gaming & Accessories Hub", |
| "Computer Peripherals": "Computer Systems Hub", |
| "Laptop & Desktop": "Computer Systems Hub", |
| "Storage": "Computer Systems Hub", |
| "Mobiles": "Mobile & Tablets Hub", |
| "Tablets": "Mobile & Tablets Hub", |
| } |
|
|
| SUB_TO_CATEGORY = {} |
| for type_block in categories_data: |
| for cat in type_block.get("categories", []): |
| for sub in cat.get("subCategories", []): |
| SUB_TO_CATEGORY[sub] = cat["category"] |
|
|
| NAME_TO_ROOM = {name.lower(): room for room, name in ROOM_INFO.items()} |
|
|
| |
| |
| |
| grid_0 = np.load(GRID_0_FILE) |
| grid_1 = np.load(GRID_1_FILE) |
| GRID_SHAPE = grid_0.shape |
|
|
| room_centroids_f3_grid = { |
| 350: (70, 465), |
| 351: (70, 435), |
| 352: (70, 395), |
| 353: (70, 360), |
| 354: (70, 320), |
| 355: (70, 285), |
| 356: (70, 215), |
| 357: (70, 160), |
| 358: (70, 85), |
| } |
|
|
| room_numbers_f4 = list(range(450, 459)) |
| room_centroids_f4_grid = {} |
| for old, new in zip(room_centroids_f3_grid.keys(), room_numbers_f4): |
| room_centroids_f4_grid[new] = room_centroids_f3_grid[old] |
|
|
|
|
| def apply_room_clearance(): |
| for r, c in room_centroids_f3_grid.values(): |
| for dr in [-1, 0, 1]: |
| for dc in [-1, 0, 1]: |
| rr, cc = r + dr, c + dc |
| if 0 <= rr < GRID_SHAPE[0] and 0 <= cc < GRID_SHAPE[1]: |
| grid_0[rr, cc] = 0 |
| grid_1[rr, cc] = 0 |
|
|
|
|
| apply_room_clearance() |
|
|
|
|
| def reset_grids(): |
| global grid_0, grid_1, GRID_SHAPE |
| grid_0 = np.load(GRID_0_FILE) |
| grid_1 = np.load(GRID_1_FILE) |
| GRID_SHAPE = grid_0.shape |
| apply_room_clearance() |
|
|
| |
| |
| |
| def load_memory(): |
| try: |
| with open(MEMORY_FILE, "r", encoding="utf-8") as f: |
| return json.load(f) |
| except Exception: |
| return [] |
|
|
|
|
| def save_memory(memory): |
| with open(MEMORY_FILE, "w", encoding="utf-8") as f: |
| json.dump(memory, f, indent=4) |
|
|
|
|
| user_memory = load_memory() |
| navigation_counter = len(user_memory) + 1 |
|
|
|
|
| def save_navigation_mem(start_room, dest_room): |
| global navigation_counter |
| entry = { |
| "nav_id": f"N{navigation_counter}", |
| "start_room": start_room, |
| "dest_room": dest_room, |
| } |
| user_memory.append(entry) |
| save_memory(user_memory) |
| navigation_counter += 1 |
| return entry |
|
|
| |
| |
| |
| client = openai.OpenAI( |
| base_url="https://openrouter.ai/api/v1", |
| api_key=OPENROUTER_KEY, |
| ) |
|
|
| SYSTEM_PROMPT = """ |
| You are an AI assistant for indoor navigation inside a shopping mall. |
| |
| Mall stores: |
| 350 Smart Devices Hub |
| 351 Gaming & Accessories Hub |
| 352 Computer Systems Hub |
| 353 Mobile & Tablets Hub |
| 354 Health & Personal Care |
| 355 Bedroom |
| 356 Living Room |
| 357 Study/Office |
| 358 Kitchen |
| 450 Dining Room |
| 451 Bathroom |
| 452 Furnishings |
| 453 Kitchen and Dining |
| 454 Home Decor |
| 455 Tools and utility |
| 456 Lighting and Electricals |
| 457 Cleaning and Bath |
| 458 Pet and Gardening |
| |
| Rules: |
| - If the user asks about a store, answer with store information. |
| - If the user asks for navigation, help them navigate. |
| - If the user says he is hungry, suggest kitchen / dining / food-related stores and when he tells you that he agree os suggestion then help him navigate. |
| - Keep answers concise and helpful. |
| """ |
|
|
| |
| |
| |
| def set_session_defaults(session): |
| session = session or {} |
| defaults = { |
| "username": None, |
| "gender": "other", |
| "low": 500, |
| "high": 5000, |
| "is_new": True, |
| "start_floor": None, |
| "start_xy": None, |
| "nav_mode": "Normal", |
| "recommended_room": None, |
| "nav_target_room": None, |
| "last_dest_room": None, |
| "last_referenced_room": None, |
| "selected_subcategory": None, |
| "accepted_store": False, |
| } |
| for k, v in defaults.items(): |
| session.setdefault(k, v) |
| return session |
|
|
|
|
| def get_floor(room): |
| if 350 <= room <= 358: |
| return 0 |
| if 450 <= room <= 458: |
| return 1 |
| return 0 |
|
|
|
|
| def get_centroid(room): |
| if room in room_centroids_f3_grid: |
| return room_centroids_f3_grid[room] |
| if room in room_centroids_f4_grid: |
| return room_centroids_f4_grid[room] |
| return None |
|
|
|
|
| def clamp_point(x, y, grid): |
| r = int(max(0, min(grid.shape[0] - 1, round(float(x))))) |
| c = int(max(0, min(grid.shape[1] - 1, round(float(y))))) |
| return r, c |
|
|
|
|
| def get_store_info(room): |
| info = { |
| 350: "Smart devices, wearables, smart home products.", |
| 351: "Gaming items, accessories, and related hardware.", |
| 352: "Computers, laptops, storage, and peripherals.", |
| 353: "Mobile phones, tablets, and accessories.", |
| 354: "Health products and personal care items.", |
| 355: "Bedroom furniture and decor.", |
| 356: "Living room furniture and decor.", |
| 357: "Office and study furniture and tools.", |
| 358: "Kitchen furniture and kitchen essentials.", |
| 450: "Dining room furniture and dining essentials.", |
| 451: "Bathroom items and accessories.", |
| 452: "Furnishings and home textiles.", |
| 453: "Kitchen and dining accessories.", |
| 454: "Home decor products.", |
| 455: "Tools and utility items.", |
| 456: "Lighting and electrical products.", |
| 457: "Cleaning and bath products.", |
| 458: "Pet and gardening products.", |
| } |
| name = ROOM_INFO.get(room) |
| if not name: |
| return None |
| return f"πͺ {name} (Room {room})\nπ {info.get(room, 'Mall store information is available for this store.')}" |
|
|
| def find_room_in_text(text): |
| text_l = (text or "").lower() |
| room_match = re.search(r"\b(3\d{2}|45[0-8])\b", text_l) |
| if room_match: |
| room = int(room_match.group(1)) |
| if room in ROOM_INFO: |
| return room |
|
|
| for name, room in sorted(NAME_TO_ROOM.items(), key=lambda x: len(x[0]), reverse=True): |
| if name in text_l: |
| return room |
| return None |
|
|
|
|
| def get_intro_message(): |
| return ( |
| "Hello π Welcome to the Mall Navigation Assistant.\n" |
| "You can navigate to:\n" |
| + "\n".join([f"- {name} ({room})" for room, name in ROOM_INFO.items()]) |
| + "\nIf you're hungry, you can go to the kitchen/dining or cafeteria-related stores." |
| ) |
|
|
|
|
| def local_chat_reply(user_text): |
| low = (user_text or "").lower().strip() |
| if not low: |
| return "Please type a message or ask about a store." |
| if any(word in low for word in ["hi", "hello", "hey", "good morning", "good evening", "how are you"]): |
| return "Hello π Welcome to SmartMall. Tell me a store name, a room number, or where you want to go." |
| if any(word in low for word in ["thanks", "thank you"]): |
| return "You are welcome." |
|
|
| room = find_room_in_text(user_text) |
| if room is not None and any(k in low for k in ["what", "tell", "about", "info", "describe", "where"]): |
| info = get_store_info(room) |
| if info: |
| return info |
|
|
| return None |
|
|
|
|
| |
| |
| |
| def save_preference(username, preference): |
| if not preference or len(preference) != 3: |
| return |
| main, cat, sub = preference |
| cursor.execute( |
| """INSERT INTO user_preferences (username, main_category, category, subcategory) |
| VALUES (?, ?, ?, ?)""", |
| (username, main, cat, sub), |
| ) |
| conn.commit() |
|
|
|
|
| def entry_point(): |
| print("\n=== Smart Mall System ===") |
| while True: |
| print("\n1) Signup") |
| print("2) Login") |
| choice = input("Enter choice: ").strip() |
| if choice == "1": |
| return "signup" |
| if choice == "2": |
| return "login" |
| print("Invalid input β") |
|
|
|
|
| def signup(): |
| print("\n=== SIGNUP ===") |
| while True: |
| username = input("Choose username: ").strip() |
| cursor.execute("SELECT username FROM users WHERE username=?", (username,)) |
| if cursor.fetchone(): |
| print("β Username exists, try again\n") |
| else: |
| break |
|
|
| password = input("Choose password: ").strip() |
| name = input("Enter name: ").strip() |
| age = int(input("Enter age: ")) |
| gender = input("Enter gender (male/female): ").strip().lower() |
|
|
| auto = input("Use auto budget? (yes/no): ").strip().lower() |
| if auto == "no": |
| lower = float(input("Lower budget: ")) |
| upper = float(input("Upper budget: ")) |
| else: |
| lower, upper = 500, 5000 |
|
|
| cursor.execute( |
| "INSERT INTO users VALUES (?, ?, ?, ?, ?, ?, ?)", |
| (username, password, name, age, gender, lower, upper), |
| ) |
| conn.commit() |
|
|
| print("\nβ
Signup successful") |
| return username, gender, lower, upper, None |
|
|
|
|
| def login(): |
| print("\n=== LOGIN ===") |
| while True: |
| username = input("Username: ").strip() |
| password = input("Password: ").strip() |
|
|
| cursor.execute( |
| "SELECT * FROM users WHERE username=? AND password=?", |
| (username, password), |
| ) |
| user = cursor.fetchone() |
|
|
| if user: |
| print("β
Login successful") |
| break |
| print("β Invalid credentials, try again\n") |
|
|
| cursor.execute( |
| """SELECT main_category, category, subcategory |
| FROM user_preferences |
| WHERE username=? |
| ORDER BY id DESC LIMIT 1""", |
| (username,), |
| ) |
| pref = cursor.fetchone() |
| preference = pref if pref else None |
|
|
| return username, user[4], user[5], user[6], preference |
|
|
|
|
| def auth_system(): |
| while True: |
| mode = entry_point() |
| if mode == "signup": |
| user_data = signup() |
| if user_data: |
| return user_data, True |
| elif mode == "login": |
| user_data = login() |
| if user_data: |
| return user_data, False |
| print("β Authentication failed, try again...\n") |
|
|
|
|
| def get_ble_location(): |
| print("\nπ Enter your current location") |
| x = float(input("X coordinate: ")) |
| y = float(input("Y coordinate: ")) |
| return (x, y) |
|
|
|
|
| def save_navigation(username, start_room, dest_room): |
| if isinstance(start_room, tuple): |
| start_room = f"{start_room[0]},{start_room[1]}" |
| cursor.execute( |
| "INSERT INTO history (username, start_point, end_store) VALUES (?, ?, ?)", |
| (username, start_room, str(dest_room)), |
| ) |
| conn.commit() |
|
|
| |
| |
| |
| def nearest_store(user_pos, user_floor): |
| best_room = None |
| best_dist = float("inf") |
|
|
| for _, room in STORE_CLUSTER_ROOM.items(): |
| |
| if get_floor(room) != user_floor: |
| continue |
|
|
| centroid = get_centroid(room) |
| if centroid is None: |
| continue |
|
|
| dist = np.sqrt((user_pos[0] - centroid[0]) ** 2 + (user_pos[1] - centroid[1]) ** 2) |
|
|
| if dist < best_dist: |
| best_dist = dist |
| best_room = room |
|
|
| return best_room |
|
|
|
|
| def most_visited_store(username): |
| cursor.execute( |
| """SELECT end_store, COUNT(*) as cnt |
| FROM history |
| WHERE username=? |
| GROUP BY end_store |
| ORDER BY cnt DESC""", |
| (username,), |
| ) |
| result = cursor.fetchone() |
| return int(result[0]) if result else None |
|
|
|
|
| |
| |
| |
| def clean_price(price_str): |
| digits = "".join(filter(str.isdigit, str(price_str))) |
| return int(digits) if digits else 0 |
|
|
|
|
| def get_categories_for_type(type_name): |
| for t in categories_data: |
| if t.get("type") == type_name: |
| return [c.get("category") for c in t.get("categories", [])] |
| return [] |
|
|
|
|
| def get_subcategories(type_name, category_name): |
| for t in categories_data: |
| if t.get("type") == type_name: |
| for c in t.get("categories", []): |
| if c.get("category") == category_name: |
| return c.get("subCategories", []) |
| return [] |
|
|
|
|
| def resolve_store_from_category(parent_category): |
| store_name = STORE_CLUSTER.get(parent_category, parent_category) |
| room = STORE_CLUSTER_ROOM.get(store_name) |
| return store_name, room |
|
|
|
|
| def category_flow(): |
| print("\nπ§ Category Selection") |
|
|
| print("\nMain categories:") |
| for i, c in enumerate(categories_data): |
| print(i, c["type"]) |
| main = int(input("Select main category: ")) |
| main_cat = categories_data[main] |
|
|
| print("\nSub categories:") |
| for i, c in enumerate(main_cat["categories"]): |
| print(i, c["category"]) |
| sub = int(input("Select category: ")) |
| sub_cat = main_cat["categories"][sub] |
|
|
| print("\nSub-sub categories:") |
| for i, c in enumerate(sub_cat["subCategories"]): |
| print(i, c) |
| sub_sub = int(input("Select subcategory: ")) |
|
|
| selected_subcategory = sub_cat["subCategories"][sub_sub] |
| if selected_subcategory is None: |
| print("β No subcategory selected") |
| return None, None, None, None |
|
|
| main_category = main_cat["type"] |
| parent_category = sub_cat["category"] |
| _, end_store = resolve_store_from_category(parent_category) |
| return selected_subcategory, end_store, parent_category, main_category |
|
|
|
|
| def get_budget(default_low, default_high): |
| print("\nπ° Budget Selection") |
| choice = input("Use auto budget? (yes/no): ").strip().lower() |
| if choice == "no": |
| lower = float(input("Enter lower budget: ")) |
| upper = float(input("Enter upper budget: ")) |
| else: |
| lower, upper = default_low, default_high |
| return lower, upper |
|
|
|
|
| def get_products(subcategory): |
| if not subcategory: |
| return [] |
|
|
| result = [] |
| subcategory = str(subcategory).strip().lower() |
|
|
| for type_block in products_data: |
| for cat in type_block.get("items", []): |
| cat_name = cat.get("category") |
| for sub in cat.get("items", []): |
| sub_name = sub.get("subCategory") |
| if cat_name and cat_name.lower() == subcategory: |
| result.extend(sub.get("items", [])) |
| elif sub_name and sub_name.lower() == subcategory: |
| result.extend(sub.get("items", [])) |
| return result |
|
|
|
|
| def filter_by_budget(products, low, high): |
| filtered = [] |
| for p in products: |
| price = clean_price(p.get("Price", 0)) |
| if low <= price <= high: |
| item = dict(p) |
| item["price_int"] = price |
| filtered.append(item) |
| return filtered |
|
|
|
|
| def gender_filter(products, gender): |
| if gender not in ["male", "female"]: |
| return products |
|
|
| keywords = { |
| "male": ["gaming", "tool", "power", "sports", "fitness"], |
| "female": ["beauty", "hair", "skin", "cosmetic", "makeup", "fashion"], |
| } |
|
|
| preferred, others = [], [] |
| for p in products: |
| name = str(p.get("Name", "")).lower() |
| if any(k in name for k in keywords[gender]): |
| preferred.append(p) |
| else: |
| others.append(p) |
| return preferred + others |
|
|
|
|
| def add_promotions(products): |
| result = [] |
| for p in products: |
| discount = random.randint(10, 50) |
| original = clean_price(p.get("Price", 0)) |
| discounted = int(original * (1 - discount / 100)) |
| result.append({ |
| "Name": p.get("Name", "Unknown Product"), |
| "Brand": p.get("Brand", "N/A"), |
| "Original": f"{original}", |
| "Discounted": f"{discounted}", |
| "Discount": discount, |
| "Rating": p.get("Rating", 0), |
| "Reviews": p.get("No of Reviews", 0), |
| }) |
| return result |
|
|
|
|
| def recommend_for_user(selected_subcategory, gender, low_budget, high_budget): |
| products = get_products(selected_subcategory) |
| if not products: |
| return {"store": "Unknown", "room": None, "products": []} |
|
|
| products = filter_by_budget(products, low_budget, high_budget) |
| if not products: |
| products = get_products(selected_subcategory) |
|
|
| products = gender_filter(products, gender) |
| products = sorted(products, key=lambda x: x.get("Rating", 0), reverse=True) |
| top_products = add_promotions(products[:5]) |
|
|
| parent_category = SUB_TO_CATEGORY.get(selected_subcategory) |
| store_name = STORE_CLUSTER.get(parent_category, parent_category) if parent_category else None |
| room = STORE_CLUSTER_ROOM.get(store_name) if store_name else None |
|
|
| return {"store": store_name or "Unknown", "room": room, "products": top_products} |
|
|
| |
| |
| |
| def find_nearest_free(grid, r, c): |
| free = (grid == 0) |
| _, ind = distance_transform_edt(~free, return_indices=True) |
| nr, nc = ind[:, r, c] |
| return int(nr), int(nc) |
|
|
|
|
| def extract_stairs(grid): |
| binary = (grid == 2).astype(int) |
| labeled, num = label(binary) |
| return labeled, num |
|
|
|
|
| stairs_0, n0 = extract_stairs(grid_0) |
| stairs_1, n1 = extract_stairs(grid_1) |
| stairs = {} |
| stair_cells = {0: {}, 1: {}} |
| for i in range(1, min(n0, n1) + 1): |
| sid = f"S{i}" |
| stairs[sid] = {"floors": [0, 1]} |
| for r, c in np.argwhere(stairs_0 == i): |
| stair_cells[0][(int(r), int(c))] = sid |
| for r, c in np.argwhere(stairs_1 == i): |
| stair_cells[1][(int(r), int(c))] = sid |
|
|
| elevator_cells = {0: set(), 1: set()} |
| for f in [0, 1]: |
| grid = grid_0 if f == 0 else grid_1 |
| coords = np.argwhere(grid == 3) |
| for r, c in coords: |
| elevator_cells[f].add((int(r), int(c))) |
|
|
| fire_active = False |
| fire_room = None |
| fire_step = 0 |
| fire_center = None |
| crowded_active = False |
| crowded_room = None |
| crowded_center = None |
|
|
|
|
| def mark_danger(grid, centroid, radius=20): |
| r0, c0 = centroid |
| for i in range(grid.shape[0]): |
| for j in range(grid.shape[1]): |
| dist = np.sqrt((i - r0) ** 2 + (j - c0) ** 2) |
| if dist < radius: |
| grid[i, j] = 1 |
|
|
|
|
| def mark_crowd(grid, centroid, room_radius=15, corridor_radius=40): |
| r0, c0 = centroid |
| for i in range(grid.shape[0]): |
| for j in range(grid.shape[1]): |
| dist = np.sqrt((i - r0) ** 2 + (j - c0) ** 2) |
| if dist < room_radius: |
| grid[i, j] = 5 |
| elif dist < corridor_radius and grid[i, j] == 0: |
| grid[i, j] = 6 |
|
|
|
|
| def heuristic(a, b): |
| return abs(a[1] - b[1]) + abs(a[2] - b[2]) + abs(a[0] - b[0]) * 10 |
|
|
|
|
| def neighbors(node, mode): |
| floor, r, c = node |
| grid = grid_0 if floor == 0 else grid_1 |
| moves = [(-1, 0), (1, 0), (0, -1), (0, 1)] |
| result = [] |
|
|
| for dr, dc in moves: |
| nr = r + dr |
| nc = c + dc |
| if 0 <= nr < grid.shape[0] and 0 <= nc < grid.shape[1]: |
| cell = grid[nr, nc] |
| if cell == 1: |
| continue |
|
|
| cost = 1 |
|
|
| if fire_active and fire_center is not None: |
| fr, fc = fire_center |
| dist = np.sqrt((nr - fr) ** 2 + (nc - fc) ** 2) |
| if dist < 120: |
| cost += (120 - dist) * 0.3 |
|
|
| if cell == 4: |
| cost += 8 |
|
|
| if mode == "Crowded": |
| if cell == 5: |
| cost += 200 |
| elif cell == 6: |
| cost += 40 |
|
|
| if mode == "Special Needs" and cell == 4: |
| cost += 100 |
|
|
| result.append(((floor, int(nr), int(nc)), cost)) |
|
|
| is_stair = (r, c) in stair_cells[floor] |
| is_elevator = (r, c) in elevator_cells[floor] |
|
|
| if mode == "Special Needs": |
| if is_elevator: |
| for f in [0, 1]: |
| if f != floor: |
| result.append(((f, r, c), 10)) |
| else: |
| if is_stair: |
| sid = stair_cells[floor][(r, c)] |
| cost = 15 if mode != "Crowded" else 40 |
| if fire_active and fire_center is not None: |
| fr, fc = fire_center |
| dist = np.sqrt((r - fr) ** 2 + (c - fc) ** 2) |
| if dist < 150: |
| cost += 200 |
| for f in stairs[sid]["floors"]: |
| if f != floor: |
| result.append(((f, r, c), cost)) |
|
|
| return result |
|
|
|
|
| def astar(start, goal, mode="Normal"): |
| open_set = [] |
| heapq.heappush(open_set, (0, start)) |
| came = {} |
| g = {start: 0} |
|
|
| while open_set: |
| _, cur = heapq.heappop(open_set) |
| if cur == goal: |
| path = [] |
| while cur in came: |
| path.append(cur) |
| cur = came[cur] |
| path.append(start) |
| return path[::-1] |
|
|
| for nxt, cost in neighbors(cur, mode): |
| ng = g[cur] + cost |
| if nxt not in g or ng < g[nxt]: |
| g[nxt] = ng |
| heapq.heappush(open_set, (ng + heuristic(nxt, goal), nxt)) |
| came[nxt] = cur |
| return None |
|
|
|
|
| def path_to_instructions(path): |
| if not path: |
| return "No path found." |
|
|
| instructions = [] |
| last_floor, last_r, last_c = path[0] |
| current_dir = None |
| dist = 0 |
|
|
| def add_instruction(direction, distance): |
| if distance > 0: |
| instructions.append(f"Move {direction} {distance * CELL_SIZE:.1f}m.") |
|
|
| for i in range(1, len(path)): |
| f, r, c = path[i] |
| if f != last_floor: |
| add_instruction(current_dir, dist) |
| instructions.append(f"Take stairs from floor {last_floor + 3} to floor {f + 3}.") |
| current_dir = None |
| dist = 0 |
| last_floor = f |
|
|
| dr = r - last_r |
| dc = c - last_c |
| if abs(dr) > abs(dc): |
| direction = "down" if dr > 0 else "up" |
| step = abs(dr) |
| else: |
| direction = "right" if dc > 0 else "left" |
| step = abs(dc) |
|
|
| if direction == current_dir: |
| dist += step |
| else: |
| add_instruction(current_dir, dist) |
| current_dir = direction |
| dist = step |
|
|
| last_r, last_c = r, c |
|
|
| add_instruction(current_dir, dist) |
| instructions.append("You have arrived at your destination room.") |
| return ". ".join(instructions) |
|
|
|
|
| def plot_static_path(path): |
| fig, axs = plt.subplots(1, 2, figsize=(18, 6)) |
| grids = [grid_0, grid_1] |
|
|
| for i, grid in enumerate(grids): |
| axs[i].imshow(grid, cmap="gray_r", origin="upper") |
| stairs_pos = np.where(grid == 2) |
| axs[i].scatter(stairs_pos[1], stairs_pos[0], s=40, label="Stairs") |
|
|
| xs = [p[2] for p in path if p[0] == i] |
| ys = [p[1] for p in path if p[0] == i] |
| if xs and ys: |
| axs[i].plot(xs, ys, linewidth=3, label="Path") |
| axs[i].scatter(xs[0], ys[0], s=80, label="Start") |
| axs[i].scatter(xs[-1], ys[-1], s=80, label="End") |
|
|
| axs[i].set_title(f"Floor {i + 3}") |
| axs[i].legend() |
|
|
| img_path = "/content/static_path.png" |
| plt.savefig(img_path) |
| plt.close() |
| return img_path |
|
|
|
|
| def navigate(start_floor, start_xy, dest_room, mode="Normal"): |
| if start_xy is None or dest_room is None: |
| return None, "β Missing start or destination." |
|
|
| sf = int(start_floor) |
| sr, sc = clamp_point(start_xy[0], start_xy[1], grid_0 if sf == 0 else grid_1) |
| sr, sc = find_nearest_free(grid_0 if sf == 0 else grid_1, sr, sc) |
|
|
| df = get_floor(dest_room) |
| dc = get_centroid(dest_room) |
| if dc is None: |
| return None, "β Could not find room centroid." |
|
|
| gr, gc = clamp_point(dc[0], dc[1], grid_0 if df == 0 else grid_1) |
| gr, gc = find_nearest_free(grid_0 if df == 0 else grid_1, gr, gc) |
|
|
| path = astar((sf, sr, sc), (df, gr, gc), mode) |
| instructions = path_to_instructions(path) |
| img_path = plot_static_path(path) if path else None |
| return img_path, instructions |
|
|
| |
| |
| |
| def chatbot_response_fallback(text): |
| local = local_chat_reply(text) |
| if local is not None: |
| return local |
| try: |
| response = client.chat.completions.create( |
| model=OPENROUTER_MODEL, |
| messages=[ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": text}, |
| ], |
| ) |
| content = response.choices[0].message.content |
| if content: |
| return content |
| except Exception: |
| pass |
| return "I am here to help with mall navigation and store information. Try a store name like Electronics Store or room 351." |
|
|
|
|
| def extract_rooms(user_text, history): |
| lower = (user_text or "").lower().strip() |
| room = find_room_in_text(user_text) |
|
|
| if room is not None: |
| if any(k in lower for k in ["what is", "tell me", "about", "info", "describe", "where is"]): |
| return "chat", None, room, False |
| if any(k in lower for k in ["take me", "guide me", "go there", "navigate", "go to", "show me"]): |
| return "navigation", None, room, True |
| return "navigation", None, room, False |
|
|
| if any(phrase in lower for phrase in ["take me there", "guide me there", "go there", "navigate me there"]): |
| return "navigation", None, None, True |
|
|
| conversation = "" |
| for u, b in history[-6:]: |
| if u: |
| conversation += f"User: {u}\n" |
| if b: |
| conversation += f"Assistant: {b}\n" |
|
|
| prompt = f""" |
| You are an AI assistant for indoor navigation inside a shopping mall. |
| |
| Extract navigation intent from the conversation. |
| |
| Return JSON only with: |
| {{ |
| "intent": "navigation" or "chat", |
| "start_room": null, |
| "dest_room": null, |
| "confirmed": true or false |
| }} |
| |
| Conversation: |
| {conversation} |
| |
| User message: |
| {user_text} |
| """ |
|
|
| try: |
| response = client.chat.completions.create( |
| model=OPENROUTER_MODEL, |
| messages=[{"role": "user", "content": prompt}], |
| response_format={"type": "json_object"}, |
| ) |
| data = json.loads(response.choices[0].message.content) |
| intent = data.get("intent", "chat") |
| start_room = data.get("start_room") |
| dest_room = data.get("dest_room") |
| confirmed = data.get("confirmed", False) |
| if start_room: |
| start_room = int(start_room) |
| if dest_room: |
| dest_room = int(dest_room) |
| return intent, start_room, dest_room, confirmed |
| except Exception: |
| return "chat", None, None, False |
|
|
|
|
| def chatbot_respond(user_text, history, session): |
| session = set_session_defaults(session) |
| history = history or [] |
| if not history: |
| history.append(("", get_intro_message())) |
|
|
| lower = (user_text or "").lower().strip() |
| local = local_chat_reply(user_text) |
| if local is not None and not any(k in lower for k in ["take me", "guide me", "go there", "navigate", "go to", "show me"]): |
| history.append((user_text, local)) |
| try: |
| gTTS(local).save("/content/voice.mp3") |
| except Exception: |
| pass |
| return history, None, "/content/voice.mp3", session |
|
|
| intent, _, dest_room, confirmed = extract_rooms(user_text, history) |
|
|
| info_room = find_room_in_text(user_text) |
| if info_room is not None and any(k in lower for k in ["what", "tell", "about", "info", "describe", "where"]): |
| info = get_store_info(info_room) |
| if info: |
| bot = f"{info}\n\nWould you like navigation to {ROOM_INFO[info_room]} (Room {info_room})?" |
| session["last_referenced_room"] = info_room |
| session["last_dest_room"] = info_room |
| history.append((user_text, bot)) |
| try: |
| gTTS(bot).save("/content/voice.mp3") |
| except Exception: |
| pass |
| return history, None, "/content/voice.mp3", session |
|
|
| if intent == "navigation" and dest_room is not None: |
| store_name = ROOM_INFO.get(dest_room, f"Room {dest_room}") |
| if not confirmed: |
| bot = f"I can guide you to {store_name} (Room {dest_room}). Do you want navigation?" |
| history.append((user_text, bot)) |
| session["last_referenced_room"] = dest_room |
| session["last_dest_room"] = dest_room |
| session["nav_target_room"] = dest_room |
| try: |
| gTTS(bot).save("/content/voice.mp3") |
| except Exception: |
| pass |
| return history, None, "/content/voice.mp3", session |
|
|
| if session.get("start_floor") is None or session.get("start_xy") is None: |
| bot = "Please set your starting location first in the Shop & Navigate tab." |
| history.append((user_text, bot)) |
| try: |
| gTTS(bot).save("/content/voice.mp3") |
| except Exception: |
| pass |
| return history, None, "/content/voice.mp3", session |
|
|
| img_path, instructions = navigate(session["start_floor"], session["start_xy"], dest_room, session.get("nav_mode", "Normal")) |
| save_navigation(session.get("username", "guest"), (session["start_floor"], session["start_xy"]), dest_room) |
| save_navigation_mem(f"floor={session['start_floor']},x={session['start_xy'][0]},y={session['start_xy'][1]}", dest_room) |
|
|
| bot = f"πΊοΈ Navigating to {store_name} (Room {dest_room})\n\n{instructions}" |
| history.append((user_text, bot)) |
| session["last_dest_room"] = dest_room |
| session["nav_target_room"] = dest_room |
| try: |
| gTTS(f"Navigating to {store_name}. {instructions.replace(chr(10), ' ')}").save("/content/voice.mp3") |
| except Exception: |
| pass |
| return history, img_path, "/content/voice.mp3", session |
|
|
| if any(phrase in lower for phrase in ["take me there", "guide me there", "go there", "navigate me there"]): |
| dest = ( |
| session.get("last_dest_room") |
| or session.get("nav_target_room") |
| or session.get("last_referenced_room") |
| or session.get("recommended_room") |
| ) |
| if dest is not None: |
| store_name = ROOM_INFO.get(dest, f"Room {dest}") |
| if session.get("start_floor") is None or session.get("start_xy") is None: |
| bot = f"I can take you to {store_name}, but I need your starting location first in the Shop & Navigate tab." |
| history.append((user_text, bot)) |
| try: |
| gTTS(bot).save("/content/voice.mp3") |
| except Exception: |
| pass |
| return history, None, "/content/voice.mp3", session |
|
|
| img_path, instructions = navigate(session["start_floor"], session["start_xy"], dest, session.get("nav_mode", "Normal")) |
| save_navigation(session.get("username", "guest"), (session["start_floor"], session["start_xy"]), dest) |
| save_navigation_mem(f"floor={session['start_floor']},x={session['start_xy'][0]},y={session['start_xy'][1]}", dest) |
|
|
| bot = f"πΊοΈ Navigating to {store_name} (Room {dest})\n\n{instructions}" |
| history.append((user_text, bot)) |
| session["last_dest_room"] = dest |
| session["nav_target_room"] = dest |
| try: |
| gTTS(f"Navigating to {store_name}. {instructions.replace(chr(10), ' ')}").save("/content/voice.mp3") |
| except Exception: |
| pass |
| return history, img_path, "/content/voice.mp3", session |
|
|
| bot = chatbot_response_fallback(user_text) |
| history.append((user_text, bot)) |
| try: |
| gTTS(bot).save("/content/voice.mp3") |
| except Exception: |
| pass |
| return history, None, "/content/voice.mp3", session |
|
|
|
|
| def speech_to_text(audio): |
| if audio is None: |
| return "" |
| try: |
| sound = AudioSegment.from_file(audio) |
| wav = "/content/temp.wav" |
| sound.export(wav, format="wav") |
| r = sr.Recognizer() |
| with sr.AudioFile(wav) as src: |
| data = r.record(src) |
| return r.recognize_google(data) |
| except Exception: |
| return "" |
|
|
| |
| |
| |
| def format_products_html(promos, store_name, room): |
| if not promos: |
| return "<div style='color:#ff6b6b;padding:20px;text-align:center;font-family:monospace'>β No products found in your budget range.</div>" |
|
|
| cards = "" |
| for p in promos: |
| stars = "β" * int(round(float(p.get("Rating", 0)))) |
| cards += f""" |
| <div style="background:linear-gradient(135deg,#1a1a2e,#16213e);border:1px solid #00d4ff33; |
| border-radius:12px;padding:16px;margin-bottom:12px;"> |
| <div style="display:flex;justify-content:space-between;align-items:flex-start;gap:10px;"> |
| <div style="flex:1;"> |
| <div style="font-size:11px;color:#888;font-family:monospace;margin-bottom:4px">{p.get('Brand','N/A')}</div> |
| <div style="color:#e0e0e0;font-size:13px;font-family:monospace;line-height:1.4">{p.get('Name','Unknown')}</div> |
| <div style="margin-top:8px;font-size:12px;color:#ffd700">{stars} {p.get('Rating',0)} ({int(p.get('Reviews',0)):,} reviews)</div> |
| </div> |
| <div style="text-align:right;flex-shrink:0;"> |
| <div style="color:#ff6b6b;font-size:12px;text-decoration:line-through;font-family:monospace">{p.get('Original','0')} EGP</div> |
| <div style="color:#00ff88;font-size:16px;font-weight:bold;font-family:monospace">{p.get('Discounted','0')} EGP</div> |
| <div style="background:#ff4500;color:white;border-radius:20px;padding:3px 10px; |
| font-size:11px;font-weight:bold;margin-top:4px;font-family:monospace">{p.get('Discount',0)}% OFF π₯</div> |
| </div> |
| </div> |
| </div>""" |
|
|
| return f""" |
| <div style="font-family:monospace;max-height:500px;overflow-y:auto;padding:4px;"> |
| <div style="color:#00d4ff;font-size:14px;font-weight:bold;margin-bottom:12px; |
| padding-bottom:8px;border-bottom:1px solid #00d4ff33;"> |
| πͺ {store_name} Β· Room {room} |
| </div> |
| {cards} |
| </div>""" |
|
|
|
|
| def set_location_ui(x, y, floor_choice, session): |
| session = set_session_defaults(session) |
| if x is None or y is None: |
| return "<div style='color:#ff6b6b;font-family:monospace'>β Please enter coordinates.</div>", session |
| session["start_floor"] = 0 if str(floor_choice) == "3" else 1 |
| session["start_xy"] = (float(x), float(y)) |
| session["start_room"] = session["start_xy"] |
| return f"<div style='color:#00ff88;font-family:monospace'>β
Location saved: floor {int(floor_choice)}, x={x}, y={y}</div>", session |
|
|
|
|
| def recommend_store_for_session(session): |
| session = set_session_defaults(session) |
| if not session.get("username"): |
| return "<div style='color:#ff6b6b;font-family:monospace'>β Please login first.</div>", session |
| if session.get("start_xy") is None or session.get("start_floor") is None: |
| return "<div style='color:#ff6b6b;font-family:monospace'>β Please enter your current location first.</div>", session |
|
|
| if session.get("is_new", True): |
| room = nearest_store(session["start_xy"], session["start_floor"]) |
| msg = "π New user β Nearest store recommended" |
| else: |
| room = most_visited_store(session["username"]) |
| if room is None: |
| room = nearest_store(session["start_xy"], session["start_floor"]) |
| msg = "π Returning user β Most visited store recommended" |
|
|
| store_name = ROOM_INFO.get(room, f"Room {room}") |
| session["recommended_room"] = room |
| session["nav_target_room"] = room |
| session["last_referenced_room"] = room |
| session["last_dest_room"] = room |
|
|
| html = f""" |
| <div style="background:linear-gradient(135deg,#1a1a2e,#0d2137);border:1px solid #00d4ff55; |
| border-radius:12px;padding:20px;font-family:monospace;"> |
| <div style="color:#888;font-size:11px;letter-spacing:2px">{msg}</div> |
| <div style="color:#00d4ff;font-size:22px;font-weight:bold;margin:8px 0">{store_name}</div> |
| <div style="color:#555;font-size:12px">Room {room}</div> |
| </div>""" |
| return html, session |
|
|
|
|
| def update_cat(type_name): |
| return gr.update(choices=get_categories_for_type(type_name), value=None) |
|
|
|
|
| def update_sub(type_name, cat_name): |
| return gr.update(choices=get_subcategories(type_name, cat_name), value=None) |
|
|
|
|
| def handle_agreement(choice, session): |
| session = set_session_defaults(session) |
| session["accepted_store"] = (choice or "").startswith("β
") |
| types = [t["type"] for t in categories_data] |
| if session["accepted_store"]: |
| return ( |
| gr.update(visible=False), |
| gr.update(visible=False), |
| gr.update(visible=False), |
| gr.update(choices=types, visible=True), |
| ) |
| return ( |
| gr.update(choices=types, visible=True), |
| gr.update(visible=True), |
| gr.update(visible=True), |
| gr.update(choices=types, visible=True), |
| ) |
|
|
|
|
| def do_navigate(choice, type_name, cat_name, sub_name, nav_mode_val, danger_room, session): |
| session = set_session_defaults(session) |
|
|
| if not session.get("username"): |
| return "β Please login first.", None |
|
|
| if session.get("start_xy") is None or session.get("start_floor") is None: |
| return "β Please set your location first.", None |
|
|
| reset_grids() |
|
|
| |
| |
| |
| if "Yes" in (choice or ""): |
| dest_room = session.get("recommended_room") |
| if dest_room is None: |
| return "β Please get a recommendation first.", None |
| else: |
| if not sub_name: |
| return "β Please select a sub-category.", None |
|
|
| parent = SUB_TO_CATEGORY.get(sub_name, cat_name) |
| store = STORE_CLUSTER.get(parent, parent) |
| dest_room = STORE_CLUSTER_ROOM.get(store) |
|
|
| if dest_room is None: |
| return "β Could not map the selected category to a store.", None |
|
|
| session["dest_room"] = dest_room |
|
|
| |
| |
| |
| if session.get("special_needs"): |
| effective_mode = "Special Needs" |
| else: |
| effective_mode = nav_mode_val |
|
|
| session["nav_mode"] = effective_mode |
|
|
| |
| |
| |
| room_val = None |
| try: |
| if danger_room is not None and str(danger_room).strip() != "": |
| room_val = int(float(danger_room)) |
| except Exception: |
| room_val = None |
|
|
| global fire_active, fire_room, fire_step, fire_center |
| global crowded_active, crowded_room, crowded_center |
|
|
| fire_active = False |
| crowded_active = False |
| fire_room = None |
| crowded_room = None |
| fire_center = None |
| crowded_center = None |
|
|
| if effective_mode == "Fire" and room_val is not None and room_val in ROOM_INFO: |
| fire_active = True |
| fire_room = room_val |
| fire_center = get_centroid(room_val) |
|
|
| if fire_center is not None: |
| fire_step += 1 |
| radius = 20 + fire_step * 6 |
|
|
| if get_floor(room_val) == 0: |
| mark_danger(grid_0, fire_center, radius) |
| else: |
| mark_danger(grid_1, fire_center, radius) |
|
|
| if effective_mode == "Crowded" and room_val is not None and room_val in ROOM_INFO: |
| crowded_active = True |
| crowded_room = room_val |
| crowded_center = get_centroid(room_val) |
|
|
| if crowded_center is not None: |
| if get_floor(room_val) == 0: |
| mark_crowd(grid_0, crowded_center) |
| else: |
| mark_crowd(grid_1, crowded_center) |
|
|
| |
| |
| |
| save_navigation( |
| session["username"], |
| (session["start_floor"], session["start_xy"]), |
| dest_room |
| ) |
|
|
| save_navigation_mem( |
| f"floor={session['start_floor']},x={session['start_xy'][0]},y={session['start_xy'][1]}", |
| dest_room |
| ) |
|
|
| |
| |
| |
| img_path, instructions = navigate( |
| session["start_floor"], |
| session["start_xy"], |
| dest_room, |
| effective_mode |
| ) |
|
|
| return instructions, img_path |
|
|
|
|
| def do_get_products(type_name, cat_name, sub_name, session): |
| session = set_session_defaults(session) |
| if not session.get("username"): |
| return "<div style='color:#ff6b6b;font-family:monospace'>β Please login first.</div>" |
| if not sub_name: |
| return "<div style='color:#ff6b6b;font-family:monospace'>β Please select a sub-category.</div>" |
|
|
| if type_name and cat_name and sub_name: |
| save_preference(session["username"], (type_name, cat_name, sub_name)) |
|
|
| gender = session.get("gender", "other") |
| low = session.get("low", 500) |
| high = session.get("high", 5000) |
| result = recommend_for_user(sub_name, gender, low, high) |
| session["selected_subcategory"] = sub_name |
| return format_products_html(result["products"], result["store"], result["room"]) |
|
|
| |
| |
| |
| with gr.Blocks( |
| theme=gr.themes.Base(primary_hue="cyan", secondary_hue="blue", neutral_hue="slate"), |
| css=""" |
| body { background: #0a0a14 !important; } |
| .gradio-container { background: #0a0a14 !important; max-width: 1200px !important; } |
| .tab-nav button { font-family: monospace; letter-spacing: 2px; font-size: 12px; } |
| .panel { background: #111827; border: 1px solid #1e3a5f; border-radius: 12px; padding: 20px; } |
| h1, h2, h3 { font-family: monospace !important; letter-spacing: 3px !important; } |
| footer { display: none !important; } |
| """, |
| title="SmartMall AI System", |
| ) as demo: |
|
|
| session_state = gr.State({}) |
|
|
| gr.HTML(""" |
| <div style="text-align:center;padding:24px 0 8px;font-family:monospace;"> |
| <div style="font-size:28px;font-weight:900;letter-spacing:6px; |
| background:linear-gradient(90deg,#00d4ff,#00ff88,#ff4466); |
| -webkit-background-clip:text;-webkit-text-fill-color:transparent"> |
| SMART MALL AI |
| </div> |
| <div style="color:#555;font-size:11px;letter-spacing:4px;margin-top:6px"> |
| NAVIGATION Β· RECOMMENDATIONS Β· ASSISTANT |
| </div> |
| </div> |
| """) |
|
|
| with gr.Tabs(): |
| |
| |
| |
| with gr.Tab("π LOGIN / SIGNUP"): |
| gr.HTML("<div style='color:#00d4ff;font-family:monospace;font-size:13px;margin-bottom:16px'>Enter your credentials to begin</div>") |
| with gr.Row(): |
| with gr.Column(scale=1): |
| auth_username = gr.Textbox(label="Username", placeholder="your_username") |
| auth_password = gr.Textbox(label="Password", type="password", placeholder="β’β’β’β’β’β’β’β’") |
| auth_name = gr.Textbox(label="Full Name (signup only)", placeholder="John Doe") |
| auth_age = gr.Number(label="Age (signup only)", value=25) |
| auth_gender = gr.Dropdown(["male", "female", "other"], label="Gender (signup only)", value="male") |
| special_needs = gr.Radio( |
| ["No", "Yes"], |
| label="Are you a person with special needs?", |
| value="No" |
| ) |
| auth_budget_l = gr.Number(label="Min Budget EGP (signup only)", value=500) |
| auth_budget_h = gr.Number(label="Max Budget EGP (signup only)", value=5000) |
|
|
| with gr.Row(): |
| signup_btn = gr.Button("SIGNUP", variant="primary") |
| login_btn = gr.Button("LOGIN", variant="secondary") |
| auth_status = gr.HTML() |
|
|
| def do_signup(username, password, name, age, gender, low, high,special, session): |
| cursor.execute("SELECT username FROM users WHERE username=?", (username,)) |
| if cursor.fetchone(): |
| return "<div style='color:#ff6b6b;font-family:monospace'>β Username already exists.</div>", session |
| cursor.execute( |
| "INSERT INTO users VALUES (?,?,?,?,?,?,?)", |
| (username, password, name, int(age), gender, low, high), |
| ) |
| conn.commit() |
| session = set_session_defaults(session) |
| session.update({"username": username, "gender": gender, "low": low, "high": high, "is_new": True}) |
| session["special_needs"] = (special == "Yes") |
| session["nav_mode"] = "Special Needs" if special == "Yes" else "Normal" |
| return f"<div style='color:#00ff88;font-family:monospace'>β
Welcome, {name}! Account created.</div>", session |
|
|
| def do_login(username, password, session): |
| cursor.execute("SELECT * FROM users WHERE username=? AND password=?", (username, password)) |
| user = cursor.fetchone() |
| if not user: |
| return "<div style='color:#ff6b6b;font-family:monospace'>β Invalid credentials.</div>", session |
| session = set_session_defaults(session) |
| session.update({"username": user[0], "gender": user[4], "low": user[5], "high": user[6], "is_new": False}) |
| return f"<div style='color:#00ff88;font-family:monospace'>β
Welcome back, {user[2]}!</div>", session |
|
|
| signup_btn.click( |
| do_signup, |
| [auth_username, auth_password, auth_name, auth_age, auth_gender, auth_budget_l, auth_budget_h, special_needs, session_state], |
| [auth_status, session_state] |
| ) |
| login_btn.click(do_login, [auth_username, auth_password, session_state], [auth_status, session_state]) |
|
|
| |
| |
| |
| with gr.Tab("πͺ SHOP & NAVIGATE"): |
| gr.HTML("<div style='color:#00d4ff;font-family:monospace;font-size:13px;margin-bottom:16px'>Get store recommendations and navigate</div>") |
|
|
| gr.HTML("<div style='color:#888;font-family:monospace;font-size:11px;margin-bottom:8px'>STEP 0 β SET YOUR LOCATION</div>") |
| with gr.Row(): |
| loc_x = gr.Number(label="X Coordinate") |
| loc_y = gr.Number(label="Y Coordinate") |
| loc_floor = gr.Dropdown(["3", "4"], label="Floor", value="3") |
| set_loc_btn = gr.Button("π Set Location") |
| loc_status = gr.HTML() |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.HTML("<div style='color:#888;font-family:monospace;font-size:11px;margin-bottom:8px'>STEP 1 β GET RECOMMENDATION</div>") |
| recommend_btn = gr.Button("π― Get Store Recommendation", variant="primary") |
| rec_result_html = gr.HTML() |
| rec_agree = gr.Radio(["β
Yes, navigate me there", "β No, I'll choose my own"], label="Accept recommendation?") |
|
|
| gr.HTML("<div style='color:#888;font-family:monospace;font-size:11px;margin-top:20px;margin-bottom:8px'>STEP 2 β CHOOSE CATEGORY (for navigation if rejected, and for products)</div>") |
| cat_type = gr.Dropdown([t["type"] for t in categories_data], label="Main Category") |
| cat_cat = gr.Dropdown([], label="Category") |
| cat_sub = gr.Dropdown([], label="Sub-Category") |
| nav_mode = gr.Dropdown(["Normal", "Fire", "Crowded", "Special Needs"], label="Navigation Mode", value="Normal") |
| danger_room = gr.Textbox(label="Fire/Crowded Room Number", placeholder="Example: 350") |
| navigate_btn = gr.Button("πΊοΈ Start Navigation", variant="primary") |
|
|
| with gr.Column(scale=1): |
| nav_instructions = gr.Textbox(label="Navigation Instructions", lines=10, interactive=False) |
| nav_image = gr.Image(label="Mall Map", type="filepath") |
|
|
| gr.HTML("<div style='color:#888;font-family:monospace;font-size:11px;margin-top:20px;margin-bottom:8px'>STEP 3 β PRODUCT RECOMMENDATIONS</div>") |
| with gr.Row(): |
| prod_type = gr.Dropdown([], label="Category Type") |
| prod_cat = gr.Dropdown([], label="Category") |
| prod_sub = gr.Dropdown([], label="Sub-Category") |
| get_products_btn = gr.Button("ποΈ Get Product Recommendations", variant="primary") |
| products_html = gr.HTML() |
|
|
| set_loc_btn.click(set_location_ui, [loc_x, loc_y, loc_floor, session_state], [loc_status, session_state]) |
| recommend_btn.click(recommend_store_for_session, [session_state], [rec_result_html, session_state]) |
| rec_agree.change(handle_agreement, [rec_agree, session_state], [cat_type, cat_cat, cat_sub, prod_type]) |
| cat_type.change(update_cat, [cat_type], [cat_cat]) |
| cat_cat.change(update_sub, [cat_type, cat_cat], [cat_sub]) |
| prod_type.change(update_cat, [prod_type], [prod_cat]) |
| prod_cat.change(update_sub, [prod_type, prod_cat], [prod_sub]) |
|
|
| navigate_btn.click( |
| do_navigate, |
| [rec_agree, cat_type, cat_cat, cat_sub, nav_mode, danger_room, session_state], |
| [nav_instructions, nav_image], |
| ) |
|
|
| get_products_btn.click(do_get_products, [prod_type, prod_cat, prod_sub, session_state], [products_html]) |
|
|
| def init_dropdowns(): |
| types = [t["type"] for t in categories_data] |
| return gr.update(choices=types), gr.update(choices=types) |
|
|
| demo.load(init_dropdowns, [], [prod_type, cat_type]) |
|
|
| |
| |
| |
| with gr.Tab("π€ AI ASSISTANT"): |
| gr.HTML("<div style='color:#00d4ff;font-family:monospace;font-size:13px;margin-bottom:16px'>Chat with the mall AI assistant</div>") |
|
|
| chatbot_widget = gr.Chatbot( |
| label="SmartMall Assistant", |
| height=480, |
| value=[ |
| {"role": "assistant", "content": "π Welcome to SmartMall! I can help you navigate, find products, or answer any questions. How can I help?"} |
| ] |
| ) |
|
|
| with gr.Row(): |
| chat_msg = gr.Textbox(placeholder="Ask me anything about the mall...", label="Message", scale=4) |
| chat_mic = gr.Audio( |
| sources=["microphone"], |
| type="filepath", |
| label="π€", |
| scale=1 |
| ) |
|
|
| chat_audio_out = gr.Audio(label="Voice Response") |
| chat_img_out = gr.Image(label="Navigation Map", type="filepath") |
|
|
| def chat_respond(msg, history, session): |
| if not msg.strip(): |
| return history, None, None, session |
| history, img, audio, session = chatbot_respond(msg, history, session) |
| return history, img, audio, session |
|
|
| def voice_chat(audio, history, session): |
| text = speech_to_text(audio) |
| if not text: |
| history.append(("π€ [voice]", "Sorry, I couldn't understand the audio.")) |
| return history, None, None, session |
| return chat_respond(text, history, session) |
|
|
| chat_msg.submit(chat_respond, [chat_msg, chatbot_widget, session_state], [chatbot_widget, chat_img_out, chat_audio_out, session_state]) |
| chat_mic.change(voice_chat, [chat_mic, chatbot_widget, session_state], [chatbot_widget, chat_img_out, chat_audio_out, session_state]) |
|
|
| if __name__ == "__main__": |
| demo.launch() |