| import os |
| import json |
| import random |
| import asyncio |
| import re |
| import fcntl |
| import time |
| from datetime import datetime |
| from concurrent.futures import ThreadPoolExecutor |
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.responses import HTMLResponse, JSONResponse |
| from pydantic import BaseModel |
| import google.generativeai as genai |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| app = FastAPI() |
|
|
| |
| MAX_FREE_DAILY_REQUESTS = 5 |
| USAGE_DB_FILE = "usage_data.json" |
|
|
| |
| all_keys_str = os.getenv("ALL_GEMINI_API_KEYS", "") |
| raw_keys = re.split(r'[,\s\n]+', all_keys_str) |
|
|
| GEMINI_KEYS = [] |
| for k in raw_keys: |
| clean_key = k.strip().replace('"', '').replace("'", "").replace("\r", "").replace("\n", "") |
| if len(clean_key) > 30: |
| GEMINI_KEYS.append(clean_key) |
|
|
| if GEMINI_KEYS: |
| print(f"Loaded {len(GEMINI_KEYS)} keys.") |
| else: |
| print("Warning: No valid keys found.") |
|
|
| executor = ThreadPoolExecutor(max_workers=10) |
|
|
| class IdeaRequest(BaseModel): |
| idea: str |
| fingerprint: str |
| is_premium: bool = False |
| is_instrumental: bool = False |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def read_root(): |
| try: |
| with open("index.html", "r", encoding="utf-8") as f: |
| return f.read() |
| except FileNotFoundError: |
| return "<h1>Error: index.html not found</h1>" |
|
|
| def sync_generate_content(prompt): |
| max_retries = 3 |
| if not GEMINI_KEYS: |
| return None |
|
|
| for _ in range(max_retries): |
| try: |
| current_key = random.choice(GEMINI_KEYS) |
| genai.configure(api_key=current_key) |
| model = genai.GenerativeModel('gemini-2.5-flash') |
| response = model.generate_content(prompt) |
| clean_json = response.text.replace("```json", "").replace("```", "").strip() |
| return json.loads(clean_json) |
| except Exception: |
| continue |
| return None |
|
|
| def check_and_increment_usage(ip: str, fingerprint: str): |
| today = datetime.now().strftime("%Y-%m-%d") |
| if not os.path.exists(USAGE_DB_FILE): |
| try: |
| with open(USAGE_DB_FILE, 'w') as f: |
| json.dump({}, f) |
| except OSError: |
| pass |
|
|
| try: |
| f = open(USAGE_DB_FILE, 'r+') |
| except FileNotFoundError: |
| f = open(USAGE_DB_FILE, 'w+') |
| json.dump({}, f) |
| f.seek(0) |
|
|
| with f: |
| try: |
| fcntl.flock(f, fcntl.LOCK_EX) |
| try: |
| content = f.read() |
| data = json.loads(content) if content else {} |
| except json.JSONDecodeError: |
| data = {} |
|
|
| ip_key = f"ip:{ip}" |
| fp_key = f"fp:{fingerprint}" |
| |
| ip_record = data.get(ip_key, {"date": today, "count": 0}) |
| if ip_record["date"] != today: |
| ip_record = {"date": today, "count": 0} |
| |
| fp_record = data.get(fp_key, {"date": today, "count": 0}) |
| if fp_record["date"] != today: |
| fp_record = {"date": today, "count": 0} |
|
|
| if ip_record["count"] >= MAX_FREE_DAILY_REQUESTS: |
| return False |
| if fingerprint and fp_record["count"] >= MAX_FREE_DAILY_REQUESTS: |
| return False |
|
|
| ip_record["count"] += 1 |
| data[ip_key] = ip_record |
| |
| if fingerprint: |
| fp_record["count"] += 1 |
| data[fp_key] = fp_record |
|
|
| f.seek(0) |
| f.truncate() |
| json.dump(data, f) |
| f.flush() |
| return True |
|
|
| finally: |
| fcntl.flock(f, fcntl.LOCK_UN) |
|
|
| @app.post("/api/refine") |
| async def refine_text(request: IdeaRequest, req: Request): |
| if not GEMINI_KEYS: |
| raise HTTPException(status_code=500, detail="کلید API تنظیم نشده است.") |
|
|
| if not request.is_premium: |
| client_ip = req.client.host |
| client_fp = request.fingerprint |
| try: |
| allowed = check_and_increment_usage(client_ip, client_fp) |
| except Exception: |
| return JSONResponse(content={"error": "System Error checking limits"}, status_code=500) |
| |
| if not allowed: |
| return JSONResponse(content={"error": "LIMIT_REACHED"}, status_code=429) |
|
|
| |
| instruction = "" |
| if request.is_instrumental: |
| instruction = """ |
| **INSTRUMENTAL MODE ACTIVE:** |
| - The user wants an instrumental song (NO LYRICS). |
| - **lyrics field MUST be an empty string ""**. |
| - Focus entirely on the `music_prompt` to describe the instruments and mood vividly. |
| """ |
| else: |
| instruction = """ |
| - **Lyrics (Match User's Language):** |
| - Detect the language of the "User Input". |
| - Write the lyrics entirely in that detected language. |
| - Structure tags: [Verse 1], [Chorus], etc. |
| - "lyrics" field must contain ONLY words to be sung. |
| """ |
|
|
| prompt = f""" |
| You are a professional songwriter and music producer. |
| Task: Convert the user's input into a COMPLETE song structure and a music generation prompt. |
| |
| {instruction} |
| |
| 1. **Music Prompt (English):** Describe the mood, instruments, BPM, and style suitable for an AI music generator. |
| |
| Output strictly in JSON format: |
| {{ |
| "music_prompt": "YOUR ENGLISH PROMPT HERE", |
| "lyrics": "YOUR FULL LYRICS HERE (OR EMPTY STRING IF INSTRUMENTAL)" |
| }} |
| |
| User Input: {request.idea} |
| """ |
| |
| loop = asyncio.get_running_loop() |
| result = await loop.run_in_executor(executor, sync_generate_content, prompt) |
| |
| if result: |
| return JSONResponse(content=result) |
| else: |
| return JSONResponse(content={"error": "Server busy. Please try again later."}, status_code=503) |