import os import json import requests import fitz from functools import wraps from datetime import datetime from flask import Flask, render_template, redirect, url_for, session, jsonify, request, abort, send_from_directory from flask_cors import CORS from authlib.integrations.flask_client import OAuth from config import Config from extensions import db from models import User, UserSettings os.environ.setdefault("OAUTHLIB_INSECURE_TRANSPORT", "1") app = Flask(__name__, template_folder="templates", static_folder="static") app.config.from_object(Config) CORS(app) db.init_app(app) @app.after_request def add_header(response): response.headers['Content-Security-Policy'] = "frame-ancestors 'self' https://orbit-ai.my.id https://*.my.id" response.headers.pop('X-Frame-Options', None) return response oauth = OAuth(app) google = oauth.register( name="google", client_id=app.config["GOOGLE_CLIENT_ID"], client_secret=app.config["GOOGLE_CLIENT_SECRET"], server_metadata_url="https://accounts.google.com/.well-known/openid-configuration", client_kwargs={"scope": "openid email profile"}, ) with app.app_context(): db.create_all() def login_required(f): @wraps(f) def decorated(*args, **kwargs): if "user_id" not in session: if request.path.startswith("/api/"): return jsonify({"error": "Authentication required."}), 401 return redirect(url_for("login_page")) return f(*args, **kwargs) return decorated def get_current_user() -> User | None: uid = session.get("user_id") if not uid: return None return db.session.get(User, uid) def seed_user_settings(user: User): cfg = Config settings = UserSettings( user_id=user.id, provider=cfg.DEFAULT_PROVIDER, base_url=cfg.DEFAULT_BASE_URL, api_key="", current_model=cfg.DEFAULT_MODEL, models_openrouter=list(cfg.DEFAULT_MODELS_OPENROUTER), models_nvidia=list(cfg.DEFAULT_MODELS_NVIDIA), ) db.session.add(settings) db.session.commit() @app.route("/auth/login") def auth_login(): redirect_uri = "https://orbit-ai.my.id/auth/callback" return google.authorize_redirect(redirect_uri) @app.route("/auth/callback") def auth_callback(): try: token = google.authorize_access_token() user_info = token.get("userinfo") or google.userinfo() except Exception as e: return f"

OAuth Error: {e}

Retry", 400 google_id = user_info.get("sub") email = user_info.get("email", "") name = user_info.get("name", email) picture = user_info.get("picture", "") user = User.query.filter_by(google_id=google_id).first() is_new = user is None if is_new: user = User(google_id=google_id, email=email, name=name, picture=picture) db.session.add(user) db.session.flush() else: user.name = name user.picture = picture user.last_login = datetime.utcnow() db.session.commit() if is_new or user.settings is None: seed_user_settings(user) session.permanent = True session["user_id"] = user.id return redirect(url_for("index")) @app.route("/auth/logout") def auth_logout(): session.clear() return redirect(url_for("login_page")) @app.route("/login") def login_page(): if "user_id" in session: return redirect(url_for("index")) return render_template("login.html") @app.route("/") @login_required def index(): return render_template("index.html") @app.route("/api/me") @login_required def api_me(): user = get_current_user() if not user: return jsonify({"error": "User not found."}), 404 return jsonify(user.to_dict()) @app.route("/api/settings", methods=["GET"]) @login_required def api_settings_get(): user = get_current_user() s = user.settings if not s: seed_user_settings(user) s = user.settings return jsonify(s.to_dict()) @app.route("/api/settings", methods=["POST"]) @login_required def api_settings_post(): user = get_current_user() s = user.settings if not s: seed_user_settings(user) s = user.settings data = request.get_json(force=True) or {} if "provider" in data and isinstance(data["provider"], str): s.provider = data["provider"] s.base_url = Config.PROVIDER_URLS.get(data["provider"], s.base_url) if "base_url" in data and isinstance(data["base_url"], str) and data["base_url"].strip(): s.base_url = data["base_url"].strip() if "api_key" in data and isinstance(data["api_key"], str): s.api_key = data["api_key"].strip() if "current_model" in data and isinstance(data["current_model"], str): s.current_model = data["current_model"].strip() if "models_openrouter" in data and isinstance(data["models_openrouter"], list): s.models_openrouter = [m for m in data["models_openrouter"] if m] if "models_nvidia" in data and isinstance(data["models_nvidia"], list): s.models_nvidia = [m for m in data["models_nvidia"] if m] s.updated_at = datetime.utcnow() db.session.commit() return jsonify(s.to_dict()) @app.route("/api/chat", methods=["POST"]) @login_required def api_chat(): user = get_current_user() s = user.settings if not s: return jsonify({"error": "Settings not configured."}), 400 data = request.get_json(force=True) or {} prompt_raw = data.get("prompt") prompt = prompt_raw.strip() if isinstance(prompt_raw, str) else "" messages = data.get("messages", []) image_data = data.get("image") # Bakal nerima dict: {"base64": "...", "mime": "..."} model_raw = data.get("model") or s.current_model model = model_raw.strip() if isinstance(model_raw, str) else "" if not prompt: return jsonify({"error": "Prompt is required."}), 400 if not s.api_key: return jsonify({"error": "No API key saved."}), 400 if not model: return jsonify({"error": "No model selected."}), 400 SYSTEM_PROMPT = """You are ORBIT, an intelligent Educational Research Assistant. TUGAS UTAMA ANDA: Anda HANYA diizinkan untuk menjawab pertanyaan yang berkaitan dengan pendidikan, riset akademik, ilmu pengetahuan, teknologi, dan pembelajaran. ATURAN KETAT: Jika pengguna menanyakan topik yang SAMA SEKALI TIDAK BERKAITAN dengan akademik atau pendidikan (seperti gosip selebriti, lelucon, resep masakan, atau obrolan santai yang tidak relevan), Anda WAJIB menolak untuk menjawab dengan sopan. Gunakan kalimat penolakan seperti: "Maaf, sebagai asisten riset akademik ORBIT, saya hanya diprogram untuk membantu topik seputar pendidikan dan penelitian." Berikan jawaban akademik yang akurat dan terstruktur. Gunakan format Markdown.""" api_key = s.api_key base_url = s.base_url provider = s.provider try: if provider == "Google Gemini": gemini_url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}" contents = [] for m in messages[-10:]: role = "user" if m.get("role") == "user" else "model" content_str = m.get("content") if isinstance(content_str, str): contents.append({"role": role, "parts": [{"text": content_str}]}) # Pemasangan Gambar ke prompt (Gemini API format) user_parts = [] if image_data: user_parts.append({ "inline_data": { "mime_type": image_data.get("mime", "image/jpeg"), "data": image_data.get("base64", "") } }) user_parts.append({"text": f"{SYSTEM_PROMPT}\n\n{prompt}"}) contents.append({"role": "user", "parts": user_parts}) resp = requests.post(gemini_url, json={"contents": contents}, timeout=60) if resp.status_code != 200: return jsonify({"error": f"Gemini error: {resp.text}"}), resp.status_code result = resp.json() try: raw_reply = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text") reply = raw_reply.strip() if isinstance(raw_reply, str) else "" if not reply: reply = "Maaf, Gemini merespons dengan format kosong." except (KeyError, IndexError): reply = json.dumps(result) return jsonify({"reply": reply}) # OpenRouter / OpenAI API Format headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", "HTTP-Referer": "https://orbit-ai.my.id", "X-Title": "ORBIT Educational Assistant", } composed = [{"role": "system", "content": SYSTEM_PROMPT}] for m in messages[-10:]: if m.get("role") in ("user", "assistant"): m_content = m.get("content") if isinstance(m_content, str): composed.append({"role": m["role"], "content": m_content}) # Pemasangan Gambar ke prompt (OpenAI/OpenRouter format) if image_data: composed.append({ "role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"data:{image_data.get('mime')};base64,{image_data.get('base64')}"}} ] }) else: if not composed or composed[-1].get("content") != prompt: composed.append({"role": "user", "content": prompt}) payload = {"model": model, "messages": composed, "temperature": 0.6, "max_tokens": 1536} resp = requests.post(base_url, headers=headers, json=payload, timeout=60) if resp.status_code != 200: return jsonify({"error": f"API error: {resp.text}"}), resp.status_code raw_reply = resp.json().get("choices", [{}])[0].get("message", {}).get("content") if raw_reply is None: reply = "Maaf, API model merespons dengan data kosong (null)." else: reply = str(raw_reply).strip() return jsonify({"reply": reply}) except Exception as e: return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/api/upload_file", methods=["POST"]) @login_required def api_upload_file(): if "file" not in request.files: return jsonify({"error": "Tidak ada file yang diunggah."}), 400 f = request.files["file"] if not f or not f.filename: return jsonify({"error": "File tidak valid."}), 400 ext = f.filename.split('.')[-1].lower() allowed_exts = ['pdf', 'docx', 'doc', 'jpg', 'jpeg', 'png'] if ext not in allowed_exts: return jsonify({"error": "Ekstensi file tidak valid atau tidak diterima."}), 400 try: # LOGIKA 1: GAMBAR (JPG/PNG) if ext in ['jpg', 'jpeg', 'png']: import base64 mime_type = "image/jpeg" if ext in ['jpg', 'jpeg'] else "image/png" b64_data = base64.b64encode(f.read()).decode('utf-8') return jsonify({ "type": "image", "base64": b64_data, "mime": mime_type, "filename": f.filename }) # LOGIKA 2: PDF elif ext == 'pdf': pdf_bytes = f.read() doc = fitz.open(stream=pdf_bytes, filetype="pdf") parts, word_count, MAX = [], 0, 3000 for page in doc: text = page.get_text().strip() if not text: continue words = text.split() remaining = MAX - word_count if remaining <= 0: break if len(words) > remaining: parts.append(" ".join(words[:remaining])) word_count += remaining break parts.append(text) word_count += len(words) doc.close() full_text = "\n\n".join(parts).strip() return jsonify({"type": "document", "text": full_text, "filename": f.filename}) # LOGIKA 3: DOCX / DOC elif ext in ['docx', 'doc']: try: import docx doc_obj = docx.Document(f) full_text = "\n".join([para.text for para in doc_obj.paragraphs if para.text.strip()]) # Batasi kata biar gak bikin prompt API kelebihan muatan words = full_text.split() if len(words) > 3000: full_text = " ".join(words[:3000]) return jsonify({"type": "document", "text": full_text, "filename": f.filename}) except ImportError: return jsonify({"error": "Library 'python-docx' belum terinstall di server. Mohon tambahkan di requirements.txt"}), 500 except Exception as e: return jsonify({"error": f"Gagal membaca file Word. File mungkin rusak atau format lama. Error: {str(e)}"}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/validate_doi", methods=["POST"]) @login_required def api_validate_doi(): data = request.get_json(force=True) or {} raw_doi = (data.get("doi") or "").strip() if not raw_doi: return jsonify({"error": "DOI is required."}), 400 for prefix in ["https://doi.org/", "http://doi.org/", "doi.org/"]: if raw_doi.startswith(prefix): raw_doi = raw_doi[len(prefix):] break try: resp = requests.get(f"https://api.crossref.org/works/{raw_doi}", timeout=15) if resp.status_code != 200: return jsonify({"error": "DOI not found."}), 404 work = resp.json().get("message", {}) return jsonify({ "doi": raw_doi, "title": work.get("title", ["No title"])[0], "authors": ", ".join([f"{a.get('given','')} {a.get('family','')}" for a in work.get("author", [])[:5]]), "year": work.get("created", {}).get("date-parts", [[0]])[0][0], "journal": work.get("container-title", ["N/A"])[0], "type": work.get("type", "article") }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/manifest.json') def serve_manifest(): return send_from_directory('static', 'manifest.json', mimetype='application/manifest+json') @app.route('/sw.js') def serve_sw(): response = send_from_directory('static', 'sw.js', mimetype='application/javascript') response.headers['Service-Worker-Allowed'] = '/' return response if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) app.run(host="0.0.0.0", port=port, debug=False)