ORBIT / app.py
xenux4u's picture
Update app.py
cbdac19 verified
import os
import json
import requests
import fitz
from functools import wraps
from datetime import datetime
from flask import Flask, render_template, redirect, url_for, session, jsonify, request, abort, send_from_directory
from flask_cors import CORS
from authlib.integrations.flask_client import OAuth
from config import Config
from extensions import db
from models import User, UserSettings
os.environ.setdefault("OAUTHLIB_INSECURE_TRANSPORT", "1")
app = Flask(__name__, template_folder="templates", static_folder="static")
app.config.from_object(Config)
CORS(app)
db.init_app(app)
@app.after_request
def add_header(response):
response.headers['Content-Security-Policy'] = "frame-ancestors 'self' https://orbit-ai.my.id https://*.my.id"
response.headers.pop('X-Frame-Options', None)
return response
oauth = OAuth(app)
google = oauth.register(
name="google",
client_id=app.config["GOOGLE_CLIENT_ID"],
client_secret=app.config["GOOGLE_CLIENT_SECRET"],
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_kwargs={"scope": "openid email profile"},
)
with app.app_context():
db.create_all()
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if "user_id" not in session:
if request.path.startswith("/api/"):
return jsonify({"error": "Authentication required."}), 401
return redirect(url_for("login_page"))
return f(*args, **kwargs)
return decorated
def get_current_user() -> User | None:
uid = session.get("user_id")
if not uid:
return None
return db.session.get(User, uid)
def seed_user_settings(user: User):
cfg = Config
settings = UserSettings(
user_id=user.id,
provider=cfg.DEFAULT_PROVIDER,
base_url=cfg.DEFAULT_BASE_URL,
api_key="",
current_model=cfg.DEFAULT_MODEL,
models_openrouter=list(cfg.DEFAULT_MODELS_OPENROUTER),
models_nvidia=list(cfg.DEFAULT_MODELS_NVIDIA),
)
db.session.add(settings)
db.session.commit()
@app.route("/auth/login")
def auth_login():
redirect_uri = "https://orbit-ai.my.id/auth/callback"
return google.authorize_redirect(redirect_uri)
@app.route("/auth/callback")
def auth_callback():
try:
token = google.authorize_access_token()
user_info = token.get("userinfo") or google.userinfo()
except Exception as e:
return f"<h3>OAuth Error: {e}</h3><a href='/'>Retry</a>", 400
google_id = user_info.get("sub")
email = user_info.get("email", "")
name = user_info.get("name", email)
picture = user_info.get("picture", "")
user = User.query.filter_by(google_id=google_id).first()
is_new = user is None
if is_new:
user = User(google_id=google_id, email=email, name=name, picture=picture)
db.session.add(user)
db.session.flush()
else:
user.name = name
user.picture = picture
user.last_login = datetime.utcnow()
db.session.commit()
if is_new or user.settings is None:
seed_user_settings(user)
session.permanent = True
session["user_id"] = user.id
return redirect(url_for("index"))
@app.route("/auth/logout")
def auth_logout():
session.clear()
return redirect(url_for("login_page"))
@app.route("/login")
def login_page():
if "user_id" in session:
return redirect(url_for("index"))
return render_template("login.html")
@app.route("/")
@login_required
def index():
return render_template("index.html")
@app.route("/api/me")
@login_required
def api_me():
user = get_current_user()
if not user:
return jsonify({"error": "User not found."}), 404
return jsonify(user.to_dict())
@app.route("/api/settings", methods=["GET"])
@login_required
def api_settings_get():
user = get_current_user()
s = user.settings
if not s:
seed_user_settings(user)
s = user.settings
return jsonify(s.to_dict())
@app.route("/api/settings", methods=["POST"])
@login_required
def api_settings_post():
user = get_current_user()
s = user.settings
if not s:
seed_user_settings(user)
s = user.settings
data = request.get_json(force=True) or {}
if "provider" in data and isinstance(data["provider"], str):
s.provider = data["provider"]
s.base_url = Config.PROVIDER_URLS.get(data["provider"], s.base_url)
if "base_url" in data and isinstance(data["base_url"], str) and data["base_url"].strip():
s.base_url = data["base_url"].strip()
if "api_key" in data and isinstance(data["api_key"], str):
s.api_key = data["api_key"].strip()
if "current_model" in data and isinstance(data["current_model"], str):
s.current_model = data["current_model"].strip()
if "models_openrouter" in data and isinstance(data["models_openrouter"], list):
s.models_openrouter = [m for m in data["models_openrouter"] if m]
if "models_nvidia" in data and isinstance(data["models_nvidia"], list):
s.models_nvidia = [m for m in data["models_nvidia"] if m]
s.updated_at = datetime.utcnow()
db.session.commit()
return jsonify(s.to_dict())
@app.route("/api/chat", methods=["POST"])
@login_required
def api_chat():
user = get_current_user()
s = user.settings
if not s:
return jsonify({"error": "Settings not configured."}), 400
data = request.get_json(force=True) or {}
prompt_raw = data.get("prompt")
prompt = prompt_raw.strip() if isinstance(prompt_raw, str) else ""
messages = data.get("messages", [])
image_data = data.get("image") # Bakal nerima dict: {"base64": "...", "mime": "..."}
model_raw = data.get("model") or s.current_model
model = model_raw.strip() if isinstance(model_raw, str) else ""
if not prompt:
return jsonify({"error": "Prompt is required."}), 400
if not s.api_key:
return jsonify({"error": "No API key saved."}), 400
if not model:
return jsonify({"error": "No model selected."}), 400
SYSTEM_PROMPT = """You are ORBIT, an intelligent Educational Research Assistant.
TUGAS UTAMA ANDA: Anda HANYA diizinkan untuk menjawab pertanyaan yang berkaitan dengan pendidikan, riset akademik, ilmu pengetahuan, teknologi, dan pembelajaran.
ATURAN KETAT: Jika pengguna menanyakan topik yang SAMA SEKALI TIDAK BERKAITAN dengan akademik atau pendidikan (seperti gosip selebriti, lelucon, resep masakan, atau obrolan santai yang tidak relevan), Anda WAJIB menolak untuk menjawab dengan sopan.
Gunakan kalimat penolakan seperti: "Maaf, sebagai asisten riset akademik ORBIT, saya hanya diprogram untuk membantu topik seputar pendidikan dan penelitian."
Berikan jawaban akademik yang akurat dan terstruktur. Gunakan format Markdown."""
api_key = s.api_key
base_url = s.base_url
provider = s.provider
try:
if provider == "Google Gemini":
gemini_url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}"
contents = []
for m in messages[-10:]:
role = "user" if m.get("role") == "user" else "model"
content_str = m.get("content")
if isinstance(content_str, str):
contents.append({"role": role, "parts": [{"text": content_str}]})
# Pemasangan Gambar ke prompt (Gemini API format)
user_parts = []
if image_data:
user_parts.append({
"inline_data": {
"mime_type": image_data.get("mime", "image/jpeg"),
"data": image_data.get("base64", "")
}
})
user_parts.append({"text": f"{SYSTEM_PROMPT}\n\n{prompt}"})
contents.append({"role": "user", "parts": user_parts})
resp = requests.post(gemini_url, json={"contents": contents}, timeout=60)
if resp.status_code != 200:
return jsonify({"error": f"Gemini error: {resp.text}"}), resp.status_code
result = resp.json()
try:
raw_reply = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text")
reply = raw_reply.strip() if isinstance(raw_reply, str) else ""
if not reply:
reply = "Maaf, Gemini merespons dengan format kosong."
except (KeyError, IndexError):
reply = json.dumps(result)
return jsonify({"reply": reply})
# OpenRouter / OpenAI API Format
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://orbit-ai.my.id",
"X-Title": "ORBIT Educational Assistant",
}
composed = [{"role": "system", "content": SYSTEM_PROMPT}]
for m in messages[-10:]:
if m.get("role") in ("user", "assistant"):
m_content = m.get("content")
if isinstance(m_content, str):
composed.append({"role": m["role"], "content": m_content})
# Pemasangan Gambar ke prompt (OpenAI/OpenRouter format)
if image_data:
composed.append({
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:{image_data.get('mime')};base64,{image_data.get('base64')}"}}
]
})
else:
if not composed or composed[-1].get("content") != prompt:
composed.append({"role": "user", "content": prompt})
payload = {"model": model, "messages": composed, "temperature": 0.6, "max_tokens": 1536}
resp = requests.post(base_url, headers=headers, json=payload, timeout=60)
if resp.status_code != 200:
return jsonify({"error": f"API error: {resp.text}"}), resp.status_code
raw_reply = resp.json().get("choices", [{}])[0].get("message", {}).get("content")
if raw_reply is None:
reply = "Maaf, API model merespons dengan data kosong (null)."
else:
reply = str(raw_reply).strip()
return jsonify({"reply": reply})
except Exception as e:
return jsonify({"error": f"Server error: {str(e)}"}), 500
@app.route("/api/upload_file", methods=["POST"])
@login_required
def api_upload_file():
if "file" not in request.files:
return jsonify({"error": "Tidak ada file yang diunggah."}), 400
f = request.files["file"]
if not f or not f.filename:
return jsonify({"error": "File tidak valid."}), 400
ext = f.filename.split('.')[-1].lower()
allowed_exts = ['pdf', 'docx', 'doc', 'jpg', 'jpeg', 'png']
if ext not in allowed_exts:
return jsonify({"error": "Ekstensi file tidak valid atau tidak diterima."}), 400
try:
# LOGIKA 1: GAMBAR (JPG/PNG)
if ext in ['jpg', 'jpeg', 'png']:
import base64
mime_type = "image/jpeg" if ext in ['jpg', 'jpeg'] else "image/png"
b64_data = base64.b64encode(f.read()).decode('utf-8')
return jsonify({
"type": "image",
"base64": b64_data,
"mime": mime_type,
"filename": f.filename
})
# LOGIKA 2: PDF
elif ext == 'pdf':
pdf_bytes = f.read()
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
parts, word_count, MAX = [], 0, 3000
for page in doc:
text = page.get_text().strip()
if not text: continue
words = text.split()
remaining = MAX - word_count
if remaining <= 0: break
if len(words) > remaining:
parts.append(" ".join(words[:remaining]))
word_count += remaining
break
parts.append(text)
word_count += len(words)
doc.close()
full_text = "\n\n".join(parts).strip()
return jsonify({"type": "document", "text": full_text, "filename": f.filename})
# LOGIKA 3: DOCX / DOC
elif ext in ['docx', 'doc']:
try:
import docx
doc_obj = docx.Document(f)
full_text = "\n".join([para.text for para in doc_obj.paragraphs if para.text.strip()])
# Batasi kata biar gak bikin prompt API kelebihan muatan
words = full_text.split()
if len(words) > 3000:
full_text = " ".join(words[:3000])
return jsonify({"type": "document", "text": full_text, "filename": f.filename})
except ImportError:
return jsonify({"error": "Library 'python-docx' belum terinstall di server. Mohon tambahkan di requirements.txt"}), 500
except Exception as e:
return jsonify({"error": f"Gagal membaca file Word. File mungkin rusak atau format lama. Error: {str(e)}"}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/validate_doi", methods=["POST"])
@login_required
def api_validate_doi():
data = request.get_json(force=True) or {}
raw_doi = (data.get("doi") or "").strip()
if not raw_doi: return jsonify({"error": "DOI is required."}), 400
for prefix in ["https://doi.org/", "http://doi.org/", "doi.org/"]:
if raw_doi.startswith(prefix):
raw_doi = raw_doi[len(prefix):]
break
try:
resp = requests.get(f"https://api.crossref.org/works/{raw_doi}", timeout=15)
if resp.status_code != 200: return jsonify({"error": "DOI not found."}), 404
work = resp.json().get("message", {})
return jsonify({
"doi": raw_doi,
"title": work.get("title", ["No title"])[0],
"authors": ", ".join([f"{a.get('given','')} {a.get('family','')}" for a in work.get("author", [])[:5]]),
"year": work.get("created", {}).get("date-parts", [[0]])[0][0],
"journal": work.get("container-title", ["N/A"])[0],
"type": work.get("type", "article")
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/manifest.json')
def serve_manifest():
return send_from_directory('static', 'manifest.json', mimetype='application/manifest+json')
@app.route('/sw.js')
def serve_sw():
response = send_from_directory('static', 'sw.js', mimetype='application/javascript')
response.headers['Service-Worker-Allowed'] = '/'
return response
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
app.run(host="0.0.0.0", port=port, debug=False)