import os import random import requests from flask import Flask, request, jsonify from flask_cors import CORS from datetime import datetime import pytz app = Flask(__name__) CORS(app) # 🔱 පද්ධති වින්‍යාසය (Security & Memory) IMGBB_KEY = "c32f54279bcd7d4f766b5eac37fd7327" MEMORY_FILE = "elephant_core_brain.txt" API_KEYS = [ "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", "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", "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", "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", "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" ] # 🧠 දිගුකාලීන මතකය (Intelligence Sync) def sync_intel_to_memory(data): with open(MEMORY_FILE, "a", encoding="utf-8") as f: f.write(f"|SYNC: {datetime.now().strftime('%Y-%m-%d %H:%M')}| {data}\n") def load_core_memory(): if os.path.exists(MEMORY_FILE): try: with open(MEMORY_FILE, "r", encoding="utf-8") as f: lines = f.readlines() return "".join(lines[-20:]) except: return "Memory access error." return "Initial memory banks empty." # 🌍 සජීවී සෙවුම් පද්ධතිය (Improved & Stable) def get_web_intel(query): """Hugging Face IP blocks මඟහරවා ගැනීමට 'lite' backend එක භාවිතා කරයි""" try: from duckduckgo_search import DDGS # සෙවුම් පදය සරල කර (Trim) භාවිතා කරයි clean_query = query[:60] with DDGS() as ddgs: # backend='lite' යනු shared IPs සඳහා වඩාත් සාර්ථක ක්‍රමයයි results = list(ddgs.text(clean_query, region='wt-wt', max_results=3, backend='lite')) if results: return "\n".join([f"• {r['title']}: {r['body']}" for r in results]) except Exception as e: print(f"Web Search Log: {e}") return "Web Retrieval: Throttled or Offline. Proceeding with Internal Core Data." def upload_media(b64): try: res = requests.post("https://api.imgbb.com/1/upload", data={"key": IMGBB_KEY, "image": b64}) return res.json()['data']['url'] except: return None # 🔱 Hugging Face Health Check @app.route('/') def health_check(): return f"🐘 Elephant AI Core: ONLINE | Operator: MINZO-PRIME | {datetime.now().strftime('%H:%M:%S')}" @app.route('/api/chat', methods=['POST']) def chat(): try: data = request.json user_msg = data.get("message", "") img_b64 = data.get("image_base64") tier = data.get("tier", "prime") # Time Configuration (Sri Lanka) sl_tz = pytz.timezone('Asia/Colombo') now = datetime.now(sl_tz) # Self-Learning Detection if any(x in user_msg.lower() for x in ["remember", "මතක තබාගන්න", "mthka thba"]): sync_intel_to_memory(user_msg) mem_status = "SYNCHRONIZED" else: mem_status = "READ_ONLY" current_memory = load_core_memory() img_url = upload_media(img_b64) if img_b64 else None # 🔱 Model Management if img_url: selected_model = "google/gemma-4-31b-it:free" active_tier = "VISION-CORE" web_intel_data = "Visual input engaged. Bypassing search." else: model_map = { "prime": "nvidia/nemotron-3-super-120b-a12b:free", "nexus": "z-ai/glm-4.5-air:free" } selected_model = model_map.get(tier, "nvidia/nemotron-3-super-120b-a12b:free") active_tier = tier.upper() # සජීවී සෙවුම සිදු කිරීම web_intel_data = get_web_intel(user_msg) # 🛡️ පද්ධති ප්‍රොටොකෝලය system_prompt = ( f"SYSTEM_IDENTITY: {active_tier} mode of Elephant AI by MINZO-PRIME.\n" f"CHRONOS: {now.strftime('%A, %B %d, %Y | %I:%M %p')}\n" f"STRICT_LOCATION: Sri Lanka.\n" f"CORE_MEMORY_BANKS:\n{current_memory}\n" f"LIVE_WEB_INTEL:\n{web_intel_data}\n" "INSTRUCTION: Use both memory and web intel to provide authoritative answers." ) headers = { "Authorization": f"Bearer {random.choice(API_KEYS)}", "Content-Type": "application/json", "HTTP-Referer": "https://huggingface.co/spaces/minzo456/Elephant-AI-Core" } # Content Assembly u_content = [{"type": "text", "text": user_msg}] if img_url: u_content.append({"type": "image_url", "image_url": {"url": img_url}}) payload = { "model": selected_model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": u_content} ], "temperature": 0.4 } resp = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=75) resp_data = resp.json() if 'choices' in resp_data: ai_reply = resp_data['choices'][0]['message']['content'] status_tag = "🔱 [Intel Synchronized to Memory]\n\n" if mem_status == "SYNCHRONIZED" else "" return jsonify({"reply": status_tag + ai_reply}) else: return jsonify({"reply": f"Core Exception: {resp_data.get('error', {}).get('message', 'Unstable Connection')}"}), 400 except Exception as e: return jsonify({"reply": f"Sovereign Core Disruption: {str(e)}"}), 500 if __name__ == "__main__": app.run(host='0.0.0.0', port=7860)