Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| import requests | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from datetime import datetime | |
| import pytz | |
| app = Flask(__name__) | |
| CORS(app) | |
| # 🔱 පද්ධති වින්යාසය (Security & Memory) | |
| IMGBB_KEY = "c32f54279bcd7d4f766b5eac37fd7327" | |
| MEMORY_FILE = "elephant_core_brain.txt" | |
| API_KEYS = [ | |
| "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", | |
| "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", | |
| "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", | |
| "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", | |
| "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" | |
| ] | |
| # 🧠 දිගුකාලීන මතකය (Intelligence Sync) | |
| def sync_intel_to_memory(data): | |
| with open(MEMORY_FILE, "a", encoding="utf-8") as f: | |
| f.write(f"|SYNC: {datetime.now().strftime('%Y-%m-%d %H:%M')}| {data}\n") | |
| def load_core_memory(): | |
| if os.path.exists(MEMORY_FILE): | |
| try: | |
| with open(MEMORY_FILE, "r", encoding="utf-8") as f: | |
| lines = f.readlines() | |
| return "".join(lines[-20:]) | |
| except: return "Memory access error." | |
| return "Initial memory banks empty." | |
| # 🌍 සජීවී සෙවුම් පද්ධතිය (Improved & Stable) | |
| def get_web_intel(query): | |
| """Hugging Face IP blocks මඟහරවා ගැනීමට 'lite' backend එක භාවිතා කරයි""" | |
| try: | |
| from duckduckgo_search import DDGS | |
| # සෙවුම් පදය සරල කර (Trim) භාවිතා කරයි | |
| clean_query = query[:60] | |
| with DDGS() as ddgs: | |
| # backend='lite' යනු shared IPs සඳහා වඩාත් සාර්ථක ක්රමයයි | |
| results = list(ddgs.text(clean_query, region='wt-wt', max_results=3, backend='lite')) | |
| if results: | |
| return "\n".join([f"• {r['title']}: {r['body']}" for r in results]) | |
| except Exception as e: | |
| print(f"Web Search Log: {e}") | |
| return "Web Retrieval: Throttled or Offline. Proceeding with Internal Core Data." | |
| def upload_media(b64): | |
| try: | |
| res = requests.post("https://api.imgbb.com/1/upload", data={"key": IMGBB_KEY, "image": b64}) | |
| return res.json()['data']['url'] | |
| except: return None | |
| # 🔱 Hugging Face Health Check | |
| def health_check(): | |
| return f"🐘 Elephant AI Core: ONLINE | Operator: MINZO-PRIME | {datetime.now().strftime('%H:%M:%S')}" | |
| def chat(): | |
| try: | |
| data = request.json | |
| user_msg = data.get("message", "") | |
| img_b64 = data.get("image_base64") | |
| tier = data.get("tier", "prime") | |
| # Time Configuration (Sri Lanka) | |
| sl_tz = pytz.timezone('Asia/Colombo') | |
| now = datetime.now(sl_tz) | |
| # Self-Learning Detection | |
| if any(x in user_msg.lower() for x in ["remember", "මතක තබාගන්න", "mthka thba"]): | |
| sync_intel_to_memory(user_msg) | |
| mem_status = "SYNCHRONIZED" | |
| else: mem_status = "READ_ONLY" | |
| current_memory = load_core_memory() | |
| img_url = upload_media(img_b64) if img_b64 else None | |
| # 🔱 Model Management | |
| if img_url: | |
| selected_model = "google/gemma-4-31b-it:free" | |
| active_tier = "VISION-CORE" | |
| web_intel_data = "Visual input engaged. Bypassing search." | |
| else: | |
| model_map = { | |
| "prime": "nvidia/nemotron-3-super-120b-a12b:free", | |
| "nexus": "z-ai/glm-4.5-air:free" | |
| } | |
| selected_model = model_map.get(tier, "nvidia/nemotron-3-super-120b-a12b:free") | |
| active_tier = tier.upper() | |
| # සජීවී සෙවුම සිදු කිරීම | |
| web_intel_data = get_web_intel(user_msg) | |
| # 🛡️ පද්ධති ප්රොටොකෝලය | |
| system_prompt = ( | |
| f"SYSTEM_IDENTITY: {active_tier} mode of Elephant AI by MINZO-PRIME.\n" | |
| f"CHRONOS: {now.strftime('%A, %B %d, %Y | %I:%M %p')}\n" | |
| f"STRICT_LOCATION: Sri Lanka.\n" | |
| f"CORE_MEMORY_BANKS:\n{current_memory}\n" | |
| f"LIVE_WEB_INTEL:\n{web_intel_data}\n" | |
| "INSTRUCTION: Use both memory and web intel to provide authoritative answers." | |
| ) | |
| headers = { | |
| "Authorization": f"Bearer {random.choice(API_KEYS)}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": "https://huggingface.co/spaces/minzo456/Elephant-AI-Core" | |
| } | |
| # Content Assembly | |
| u_content = [{"type": "text", "text": user_msg}] | |
| if img_url: u_content.append({"type": "image_url", "image_url": {"url": img_url}}) | |
| payload = { | |
| "model": selected_model, | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": u_content} | |
| ], | |
| "temperature": 0.4 | |
| } | |
| resp = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=75) | |
| resp_data = resp.json() | |
| if 'choices' in resp_data: | |
| ai_reply = resp_data['choices'][0]['message']['content'] | |
| status_tag = "🔱 [Intel Synchronized to Memory]\n\n" if mem_status == "SYNCHRONIZED" else "" | |
| return jsonify({"reply": status_tag + ai_reply}) | |
| else: | |
| return jsonify({"reply": f"Core Exception: {resp_data.get('error', {}).get('message', 'Unstable Connection')}"}), 400 | |
| except Exception as e: | |
| return jsonify({"reply": f"Sovereign Core Disruption: {str(e)}"}), 500 | |
| if __name__ == "__main__": | |
| app.run(host='0.0.0.0', port=7860) |