| import os |
| import json |
| import requests |
| import fitz |
|
|
| from functools import wraps |
| from datetime import datetime |
|
|
| from flask import Flask, render_template, redirect, url_for, session, jsonify, request, abort, send_from_directory |
| from flask_cors import CORS |
| from authlib.integrations.flask_client import OAuth |
|
|
| from config import Config |
| from extensions import db |
| from models import User, UserSettings |
|
|
| os.environ.setdefault("OAUTHLIB_INSECURE_TRANSPORT", "1") |
|
|
| app = Flask(__name__, template_folder="templates", static_folder="static") |
| app.config.from_object(Config) |
| CORS(app) |
| db.init_app(app) |
|
|
| @app.after_request |
| def add_header(response): |
| response.headers['Content-Security-Policy'] = "frame-ancestors 'self' https://orbit-ai.my.id https://*.my.id" |
| response.headers.pop('X-Frame-Options', None) |
| return response |
|
|
| oauth = OAuth(app) |
| google = oauth.register( |
| name="google", |
| client_id=app.config["GOOGLE_CLIENT_ID"], |
| client_secret=app.config["GOOGLE_CLIENT_SECRET"], |
| server_metadata_url="https://accounts.google.com/.well-known/openid-configuration", |
| client_kwargs={"scope": "openid email profile"}, |
| ) |
|
|
| with app.app_context(): |
| db.create_all() |
|
|
| def login_required(f): |
| @wraps(f) |
| def decorated(*args, **kwargs): |
| if "user_id" not in session: |
| if request.path.startswith("/api/"): |
| return jsonify({"error": "Authentication required."}), 401 |
| return redirect(url_for("login_page")) |
| return f(*args, **kwargs) |
| return decorated |
|
|
| def get_current_user() -> User | None: |
| uid = session.get("user_id") |
| if not uid: |
| return None |
| return db.session.get(User, uid) |
|
|
| def seed_user_settings(user: User): |
| cfg = Config |
| settings = UserSettings( |
| user_id=user.id, |
| provider=cfg.DEFAULT_PROVIDER, |
| base_url=cfg.DEFAULT_BASE_URL, |
| api_key="", |
| current_model=cfg.DEFAULT_MODEL, |
| models_openrouter=list(cfg.DEFAULT_MODELS_OPENROUTER), |
| models_nvidia=list(cfg.DEFAULT_MODELS_NVIDIA), |
| ) |
| db.session.add(settings) |
| db.session.commit() |
|
|
| @app.route("/auth/login") |
| def auth_login(): |
| redirect_uri = "https://orbit-ai.my.id/auth/callback" |
| return google.authorize_redirect(redirect_uri) |
|
|
| @app.route("/auth/callback") |
| def auth_callback(): |
| try: |
| token = google.authorize_access_token() |
| user_info = token.get("userinfo") or google.userinfo() |
| except Exception as e: |
| return f"<h3>OAuth Error: {e}</h3><a href='/'>Retry</a>", 400 |
|
|
| google_id = user_info.get("sub") |
| email = user_info.get("email", "") |
| name = user_info.get("name", email) |
| picture = user_info.get("picture", "") |
|
|
| user = User.query.filter_by(google_id=google_id).first() |
| is_new = user is None |
|
|
| if is_new: |
| user = User(google_id=google_id, email=email, name=name, picture=picture) |
| db.session.add(user) |
| db.session.flush() |
| else: |
| user.name = name |
| user.picture = picture |
| user.last_login = datetime.utcnow() |
|
|
| db.session.commit() |
|
|
| if is_new or user.settings is None: |
| seed_user_settings(user) |
|
|
| session.permanent = True |
| session["user_id"] = user.id |
| return redirect(url_for("index")) |
|
|
| @app.route("/auth/logout") |
| def auth_logout(): |
| session.clear() |
| return redirect(url_for("login_page")) |
|
|
| @app.route("/login") |
| def login_page(): |
| if "user_id" in session: |
| return redirect(url_for("index")) |
| return render_template("login.html") |
|
|
| @app.route("/") |
| @login_required |
| def index(): |
| return render_template("index.html") |
|
|
| @app.route("/api/me") |
| @login_required |
| def api_me(): |
| user = get_current_user() |
| if not user: |
| return jsonify({"error": "User not found."}), 404 |
| return jsonify(user.to_dict()) |
|
|
| @app.route("/api/settings", methods=["GET"]) |
| @login_required |
| def api_settings_get(): |
| user = get_current_user() |
| s = user.settings |
| if not s: |
| seed_user_settings(user) |
| s = user.settings |
| return jsonify(s.to_dict()) |
|
|
| @app.route("/api/settings", methods=["POST"]) |
| @login_required |
| def api_settings_post(): |
| user = get_current_user() |
| s = user.settings |
| if not s: |
| seed_user_settings(user) |
| s = user.settings |
|
|
| data = request.get_json(force=True) or {} |
|
|
| if "provider" in data and isinstance(data["provider"], str): |
| s.provider = data["provider"] |
| s.base_url = Config.PROVIDER_URLS.get(data["provider"], s.base_url) |
|
|
| if "base_url" in data and isinstance(data["base_url"], str) and data["base_url"].strip(): |
| s.base_url = data["base_url"].strip() |
|
|
| if "api_key" in data and isinstance(data["api_key"], str): |
| s.api_key = data["api_key"].strip() |
|
|
| if "current_model" in data and isinstance(data["current_model"], str): |
| s.current_model = data["current_model"].strip() |
|
|
| if "models_openrouter" in data and isinstance(data["models_openrouter"], list): |
| s.models_openrouter = [m for m in data["models_openrouter"] if m] |
|
|
| if "models_nvidia" in data and isinstance(data["models_nvidia"], list): |
| s.models_nvidia = [m for m in data["models_nvidia"] if m] |
|
|
| s.updated_at = datetime.utcnow() |
| db.session.commit() |
| return jsonify(s.to_dict()) |
|
|
| @app.route("/api/chat", methods=["POST"]) |
| @login_required |
| def api_chat(): |
| user = get_current_user() |
| s = user.settings |
| if not s: |
| return jsonify({"error": "Settings not configured."}), 400 |
|
|
| data = request.get_json(force=True) or {} |
| |
| prompt_raw = data.get("prompt") |
| prompt = prompt_raw.strip() if isinstance(prompt_raw, str) else "" |
| |
| messages = data.get("messages", []) |
| image_data = data.get("image") |
| |
| model_raw = data.get("model") or s.current_model |
| model = model_raw.strip() if isinstance(model_raw, str) else "" |
|
|
| if not prompt: |
| return jsonify({"error": "Prompt is required."}), 400 |
| if not s.api_key: |
| return jsonify({"error": "No API key saved."}), 400 |
| if not model: |
| return jsonify({"error": "No model selected."}), 400 |
|
|
| SYSTEM_PROMPT = """You are ORBIT, an intelligent Educational Research Assistant. |
| TUGAS UTAMA ANDA: Anda HANYA diizinkan untuk menjawab pertanyaan yang berkaitan dengan pendidikan, riset akademik, ilmu pengetahuan, teknologi, dan pembelajaran. |
| ATURAN KETAT: Jika pengguna menanyakan topik yang SAMA SEKALI TIDAK BERKAITAN dengan akademik atau pendidikan (seperti gosip selebriti, lelucon, resep masakan, atau obrolan santai yang tidak relevan), Anda WAJIB menolak untuk menjawab dengan sopan. |
| Gunakan kalimat penolakan seperti: "Maaf, sebagai asisten riset akademik ORBIT, saya hanya diprogram untuk membantu topik seputar pendidikan dan penelitian." |
| Berikan jawaban akademik yang akurat dan terstruktur. Gunakan format Markdown.""" |
|
|
| api_key = s.api_key |
| base_url = s.base_url |
| provider = s.provider |
|
|
| try: |
| if provider == "Google Gemini": |
| gemini_url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}" |
| contents = [] |
| for m in messages[-10:]: |
| role = "user" if m.get("role") == "user" else "model" |
| content_str = m.get("content") |
| if isinstance(content_str, str): |
| contents.append({"role": role, "parts": [{"text": content_str}]}) |
| |
| |
| user_parts = [] |
| if image_data: |
| user_parts.append({ |
| "inline_data": { |
| "mime_type": image_data.get("mime", "image/jpeg"), |
| "data": image_data.get("base64", "") |
| } |
| }) |
| user_parts.append({"text": f"{SYSTEM_PROMPT}\n\n{prompt}"}) |
| contents.append({"role": "user", "parts": user_parts}) |
|
|
| resp = requests.post(gemini_url, json={"contents": contents}, timeout=60) |
| if resp.status_code != 200: |
| return jsonify({"error": f"Gemini error: {resp.text}"}), resp.status_code |
|
|
| result = resp.json() |
| try: |
| raw_reply = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text") |
| reply = raw_reply.strip() if isinstance(raw_reply, str) else "" |
| if not reply: |
| reply = "Maaf, Gemini merespons dengan format kosong." |
| except (KeyError, IndexError): |
| reply = json.dumps(result) |
| return jsonify({"reply": reply}) |
|
|
| |
| headers = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {api_key}", |
| "HTTP-Referer": "https://orbit-ai.my.id", |
| "X-Title": "ORBIT Educational Assistant", |
| } |
| composed = [{"role": "system", "content": SYSTEM_PROMPT}] |
| for m in messages[-10:]: |
| if m.get("role") in ("user", "assistant"): |
| m_content = m.get("content") |
| if isinstance(m_content, str): |
| composed.append({"role": m["role"], "content": m_content}) |
| |
| |
| if image_data: |
| composed.append({ |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": prompt}, |
| {"type": "image_url", "image_url": {"url": f"data:{image_data.get('mime')};base64,{image_data.get('base64')}"}} |
| ] |
| }) |
| else: |
| if not composed or composed[-1].get("content") != prompt: |
| composed.append({"role": "user", "content": prompt}) |
|
|
| payload = {"model": model, "messages": composed, "temperature": 0.6, "max_tokens": 1536} |
| resp = requests.post(base_url, headers=headers, json=payload, timeout=60) |
|
|
| if resp.status_code != 200: |
| return jsonify({"error": f"API error: {resp.text}"}), resp.status_code |
|
|
| raw_reply = resp.json().get("choices", [{}])[0].get("message", {}).get("content") |
| |
| if raw_reply is None: |
| reply = "Maaf, API model merespons dengan data kosong (null)." |
| else: |
| reply = str(raw_reply).strip() |
|
|
| return jsonify({"reply": reply}) |
|
|
| except Exception as e: |
| return jsonify({"error": f"Server error: {str(e)}"}), 500 |
|
|
| @app.route("/api/upload_file", methods=["POST"]) |
| @login_required |
| def api_upload_file(): |
| if "file" not in request.files: |
| return jsonify({"error": "Tidak ada file yang diunggah."}), 400 |
| f = request.files["file"] |
| if not f or not f.filename: |
| return jsonify({"error": "File tidak valid."}), 400 |
| |
| ext = f.filename.split('.')[-1].lower() |
| allowed_exts = ['pdf', 'docx', 'doc', 'jpg', 'jpeg', 'png'] |
| |
| if ext not in allowed_exts: |
| return jsonify({"error": "Ekstensi file tidak valid atau tidak diterima."}), 400 |
|
|
| try: |
| |
| if ext in ['jpg', 'jpeg', 'png']: |
| import base64 |
| mime_type = "image/jpeg" if ext in ['jpg', 'jpeg'] else "image/png" |
| b64_data = base64.b64encode(f.read()).decode('utf-8') |
| return jsonify({ |
| "type": "image", |
| "base64": b64_data, |
| "mime": mime_type, |
| "filename": f.filename |
| }) |
| |
| |
| elif ext == 'pdf': |
| pdf_bytes = f.read() |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") |
| parts, word_count, MAX = [], 0, 3000 |
| for page in doc: |
| text = page.get_text().strip() |
| if not text: continue |
| words = text.split() |
| remaining = MAX - word_count |
| if remaining <= 0: break |
| if len(words) > remaining: |
| parts.append(" ".join(words[:remaining])) |
| word_count += remaining |
| break |
| parts.append(text) |
| word_count += len(words) |
| doc.close() |
| full_text = "\n\n".join(parts).strip() |
| return jsonify({"type": "document", "text": full_text, "filename": f.filename}) |
| |
| |
| elif ext in ['docx', 'doc']: |
| try: |
| import docx |
| doc_obj = docx.Document(f) |
| full_text = "\n".join([para.text for para in doc_obj.paragraphs if para.text.strip()]) |
| |
| words = full_text.split() |
| if len(words) > 3000: |
| full_text = " ".join(words[:3000]) |
| return jsonify({"type": "document", "text": full_text, "filename": f.filename}) |
| except ImportError: |
| return jsonify({"error": "Library 'python-docx' belum terinstall di server. Mohon tambahkan di requirements.txt"}), 500 |
| except Exception as e: |
| return jsonify({"error": f"Gagal membaca file Word. File mungkin rusak atau format lama. Error: {str(e)}"}), 500 |
| |
| except Exception as e: |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route("/api/validate_doi", methods=["POST"]) |
| @login_required |
| def api_validate_doi(): |
| data = request.get_json(force=True) or {} |
| raw_doi = (data.get("doi") or "").strip() |
| if not raw_doi: return jsonify({"error": "DOI is required."}), 400 |
| for prefix in ["https://doi.org/", "http://doi.org/", "doi.org/"]: |
| if raw_doi.startswith(prefix): |
| raw_doi = raw_doi[len(prefix):] |
| break |
| try: |
| resp = requests.get(f"https://api.crossref.org/works/{raw_doi}", timeout=15) |
| if resp.status_code != 200: return jsonify({"error": "DOI not found."}), 404 |
| work = resp.json().get("message", {}) |
| return jsonify({ |
| "doi": raw_doi, |
| "title": work.get("title", ["No title"])[0], |
| "authors": ", ".join([f"{a.get('given','')} {a.get('family','')}" for a in work.get("author", [])[:5]]), |
| "year": work.get("created", {}).get("date-parts", [[0]])[0][0], |
| "journal": work.get("container-title", ["N/A"])[0], |
| "type": work.get("type", "article") |
| }) |
| except Exception as e: |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route('/manifest.json') |
| def serve_manifest(): |
| return send_from_directory('static', 'manifest.json', mimetype='application/manifest+json') |
|
|
| @app.route('/sw.js') |
| def serve_sw(): |
| response = send_from_directory('static', 'sw.js', mimetype='application/javascript') |
| response.headers['Service-Worker-Allowed'] = '/' |
| return response |
|
|
| if __name__ == "__main__": |
| port = int(os.environ.get("PORT", 7860)) |
| app.run(host="0.0.0.0", port=port, debug=False) |