| import gradio as gr |
| import os |
| import re |
| import torch |
| import gc |
| from PIL import Image |
| from transformers import pipeline |
| from langchain_chroma import Chroma |
| from langchain_community.document_loaders import PyPDFLoader, TextLoader |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_core.documents import Document |
| from langchain_huggingface import HuggingFaceEmbeddings |
| from ultralytics import YOLO |
|
|
| |
| CHROMA_PATH = "/tmp/chroma_db" |
| VISION_MODEL = "HuggingFaceTB/SmolVLM-Instruct" |
|
|
| |
| print("βοΈ Loading Stable Vision Engine...") |
| vision_pipe = pipeline( |
| "image-text-to-text", |
| model=VISION_MODEL, |
| model_kwargs={"dtype": torch.float32}, |
| device="cpu" |
| ) |
|
|
| print("π Loading Embedding Engine...") |
| embed_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") |
|
|
| |
| def get_bottle_crops(image_path): |
| print(f"π DEBUG: Starting YOLO on {image_path}") |
| found_crops = [] |
| |
| try: |
| original_img = Image.open(image_path).convert("RGB") |
| img_w, img_h = original_img.size |
| |
| yolo_model = YOLO("yolov8n.pt") |
| results = yolo_model(image_path, verbose=False, conf=0.1) |
| |
| for r in results: |
| for box in r.boxes: |
| if int(box.cls) in [39, 40, 41]: |
| x1, y1, x2, y2 = box.xyxy[0].tolist() |
| |
| |
| box_w, box_h = x2 - x1, y2 - y1 |
| pad_x, pad_y = int(box_w * 0.25), int(box_h * 0.25) |
| |
| x1, y1 = max(0, x1 - pad_x), max(0, y1 - pad_y) |
| x2, y2 = min(img_w, x2 + pad_x), min(img_h, y2 + pad_y) |
| |
| found_crops.append(original_img.crop((x1, y1, x2, y2))) |
|
|
| del yolo_model |
| gc.collect() |
| return found_crops if found_crops else [original_img] |
| except Exception as e: |
| print(f"β YOLO Error: {e}") |
| return [] |
|
|
| |
| def ingest_recipes(files): |
| if not files: return "β No files uploaded." |
| |
| docs = [] |
| for f in files: |
| try: |
| if f.name.endswith(".txt"): docs.extend(TextLoader(f.name).load()) |
| elif f.name.endswith(".pdf"): docs.extend(PyPDFLoader(f.name).load()) |
| except Exception as e: print(f"Error: {e}") |
| |
| if not docs: return "β Could not extract text." |
|
|
| full_text = "\n".join([d.page_content for d in docs]) |
| raw_chunks = re.split(r'(?m)^(?=Recipe:)', full_text) |
| |
| split_docs = [] |
| for chunk in raw_chunks: |
| clean_chunk = re.sub(r'βΈ»+', '', chunk).strip() |
| if len(clean_chunk) > 20: |
| split_docs.append(Document(page_content=clean_chunk)) |
|
|
| try: |
| Chroma.from_documents(split_docs, embed_model, persist_directory=CHROMA_PATH) |
| return f"β
Bar library updated. Strictly split into {len(split_docs)} individual recipes." |
| except Exception as e: |
| return f"β Database Error: {e}" |
|
|
| |
| def bartend(message, history, img_path, inventory): |
| debug_images = [] |
| |
| if img_path: |
| crops = get_bottle_crops(img_path) |
| debug_images = crops |
| |
| |
| target_img = crops[0] if crops else Image.open(img_path).convert("RGB") |
| |
| def identify_spirit(image_input): |
| |
| |
| fast_img = image_input.copy() |
| if fast_img.mode != "RGB": fast_img = fast_img.convert("RGB") |
| |
| |
| fast_img.thumbnail((384, 384)) |
| |
| prompt = "User: <image>\nRead the label. What is the specific brand and type of alcohol? Be precise.\nAssistant:" |
| |
| |
| out = vision_pipe(fast_img, prompt, generate_kwargs={"max_new_tokens": 15}) |
| text = out[0]['generated_text'] |
| if "Assistant:" in text: return text.split("Assistant:")[-1].strip() |
| return text.replace("User: <image>", "").strip() |
|
|
| try: |
| inventory = identify_spirit(target_img) |
| inventory = re.sub(r'<.*?>', '', inventory).strip().split('.')[0] |
| print(f"π Pass 1 Result: {inventory}") |
| |
| generic_terms = ["vodka", "gin", "rum", "tequila", "whiskey", "whisky", "bourbon", "brandy", "alcohol", "liquor", "spirit", "bottle", "drink"] |
| |
| |
| if inventory.lower() in generic_terms or len(inventory) < 4: |
| print("β οΈ Result too generic. Trying FULL IMAGE...") |
| full_img_result = identify_spirit(Image.open(img_path)) |
| full_img_result = re.sub(r'<.*?>', '', full_img_result).strip().split('.')[0] |
| if len(full_img_result) > len(inventory): |
| inventory = full_img_result |
| print(f"β
Pass 2 Result: {inventory}") |
| |
| except Exception as e: |
| print(f"β Vision Failed: {e}") |
| inventory = "Unknown Spirit" |
|
|
| recipe_context = "" |
| if inventory and inventory not in ["Empty Shelf", "Unknown Spirit", ""]: |
| try: |
| if os.path.exists(CHROMA_PATH): |
| vs = Chroma(persist_directory=CHROMA_PATH, embedding_function=embed_model) |
| search_query = f"Cocktail recipe using {inventory}" |
| |
| |
| results = vs.similarity_search(search_query, k=4) |
| recipe_context = "\n\n---\n\n".join([d.page_content for d in results]) |
| except Exception as e: |
| print(f"Search error: {e}") |
|
|
| if inventory == "Unknown Spirit": |
| response = "I'm having trouble reading that label. Check the 'Vision Debug' gallery belowβis the crop clear?" |
| elif recipe_context: |
| response = f"I see you have **{inventory}**. Here are a few options from your collection:\n\n{recipe_context}" |
| else: |
| response = f"I see you have **{inventory}**! I don't have a specific recipe for that in the current library. Should I suggest a classic drink?" |
|
|
| history.append({"role": "user", "content": message}) |
| history.append({"role": "assistant", "content": response}) |
|
|
| return history, inventory, debug_images |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# πΈ LocalAGI: The AI Sommelier") |
| inv_state = gr.State("Empty Shelf") |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| file_up = gr.File(label="1. Upload Recipe PDFs/TXTs", file_count="multiple") |
| ingest_btn = gr.Button("π₯ Load Recipes into Memory") |
| status = gr.Textbox(label="System Status", value="Ready") |
| |
| gr.Markdown("---") |
| img = gr.Image(type="filepath", label="2. Photo of your Bottle") |
| |
| with gr.Accordion("π Vision Debug", open=False): |
| debug_gallery = gr.Gallery(label="YOLO Crops", columns=2, height="auto") |
| |
| with gr.Column(scale=2): |
| chatbot = gr.Chatbot(height=500, label="Bartender Chat") |
| msg = gr.Textbox(label="3. Your Message", placeholder="Ask for a drink suggestion...") |
| send_btn = gr.Button("Mix It Up", variant="primary") |
|
|
| ingest_btn.click(ingest_recipes, file_up, status) |
| msg.submit(bartend, [msg, chatbot, img, inv_state], [chatbot, inv_state, debug_gallery]) |
| send_btn.click(bartend, [msg, chatbot, img, inv_state], [chatbot, inv_state, debug_gallery]) |
|
|
| if __name__ == "__main__": |
| demo.launch(theme=gr.themes.Soft()) |