| import os |
| import re |
| import logging |
| import uuid |
| import time |
| from datetime import datetime, timezone, timedelta |
| from collections import defaultdict |
| from typing import Optional, Dict, Any |
| import asyncio |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path, Request |
| from fastapi.responses import StreamingResponse |
| from pydantic import BaseModel, Field |
|
|
| import openai |
| import google.generativeai as genai |
| from google.generativeai.types import GenerationConfig |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| datefmt='%Y-%m-%d %H:%M:%S' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| CUSTOM_API_BASE_URL_DEFAULT = "https://api-q3ieh5raqfuad9o8.aistudio-app.com/v1" |
| CUSTOM_API_MODEL_DEFAULT = "gemma3:27b" |
| DEFAULT_GEMINI_MODEL = "gemini-2.0-flash" |
| GEMINI_REQUEST_TIMEOUT_SECONDS = 300 |
|
|
| |
| tasks_db: Dict[str, Dict[str, Any]] = {} |
|
|
| |
| class ChatPayload(BaseModel): |
| message: str |
| temperature: float = Field(0.6, ge=0.0, le=1.0) |
|
|
| class GeminiTaskRequest(BaseModel): |
| message: str |
| url: Optional[str] = None |
| gemini_model: Optional[str] = None |
| api_key: Optional[str] = Field(None, description="Gemini API Key (optional; uses Space secret if not provided)") |
|
|
| class TaskSubmissionResponse(BaseModel): |
| task_id: str |
| status: str |
| task_detail_url: str |
|
|
| class TaskStatusResponse(BaseModel): |
| task_id: str |
| status: str |
| submitted_at: datetime |
| last_updated_at: datetime |
| result: Optional[str] = None |
| error: Optional[str] = None |
| |
|
|
|
|
| |
| class RateLimiter: |
| def __init__(self, max_requests: int, time_window: timedelta): |
| self.max_requests = max_requests |
| self.time_window = time_window |
| self.requests: Dict[str, list] = defaultdict(list) |
| |
| def _cleanup_old_requests(self, user_ip: str) -> None: |
| """Remove requests that are outside the time window.""" |
| current_time = time.time() |
| self.requests[user_ip] = [ |
| timestamp for timestamp in self.requests[user_ip] |
| if current_time - timestamp < self.time_window.total_seconds() |
| ] |
| |
| def is_rate_limited(self, user_ip: str) -> bool: |
| """Check if the user has exceeded their rate limit.""" |
| self._cleanup_old_requests(user_ip) |
| |
| |
| current_count = len(self.requests[user_ip]) |
| |
| |
| current_time = time.time() |
| self.requests[user_ip].append(current_time) |
| |
| |
| return (current_count + 1) > self.max_requests |
| |
| def get_current_count(self, user_ip: str) -> int: |
| """Get the current request count for an IP.""" |
| self._cleanup_old_requests(user_ip) |
| return len(self.requests[user_ip]) |
|
|
|
|
| |
| rate_limiter = RateLimiter( |
| max_requests=50, |
| time_window=timedelta(days=1) |
| ) |
|
|
| def get_user_ip(request: Request) -> str: |
| """Helper function to get user's IP address.""" |
| forwarded = request.headers.get("X-Forwarded-For") |
| if forwarded: |
| return forwarded.split(",")[0] |
| return request.client.host |
|
|
|
|
| class ApiRotator: |
| def __init__(self, apis): |
| self.apis = apis |
| self.last_successful_index = None |
|
|
| def get_prioritized_apis(self): |
| if self.last_successful_index is not None: |
| |
| rotated_apis = ( |
| [self.apis[self.last_successful_index]] + |
| self.apis[:self.last_successful_index] + |
| self.apis[self.last_successful_index+1:] |
| ) |
| return rotated_apis |
| return self.apis |
|
|
| def update_last_successful(self, index): |
| self.last_successful_index = index |
|
|
|
|
| |
| app = FastAPI( |
| title="Dual Chat & Async Gemini API", |
| description="Made by Cody from chrunos.com.", |
| version="2.0.0" |
| ) |
|
|
| |
| def is_video_url_for_gemini(url: Optional[str]) -> bool: |
| if not url: |
| return False |
| |
| youtube_regex = ( |
| r'(https_?://)?(www\.)?' |
| r'(youtube|youtu|youtube-nocookie)\.(com|be)/' |
| r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})' |
| ) |
| |
| googleusercontent_youtube_regex = r'https_?://googleusercontent\.com/youtube\.com/\w+' |
| |
| return re.match(youtube_regex, url) is not None or \ |
| re.match(googleusercontent_youtube_regex, url) is not None |
|
|
| async def process_gemini_request_background( |
| task_id: str, |
| user_message: str, |
| input_url: Optional[str], |
| requested_gemini_model: str, |
| gemini_key_to_use: str |
| ): |
| logger.info(f"[Task {task_id}] Starting background Gemini processing. Model: {requested_gemini_model}, URL: {input_url}") |
| tasks_db[task_id]["status"] = "PROCESSING" |
| tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc) |
|
|
| try: |
| genai.configure(api_key=gemini_key_to_use) |
| |
| model_instance = genai.GenerativeModel(model_name=requested_gemini_model) |
| |
| content_parts = [{"text": user_message}] |
| if input_url and is_video_url_for_gemini(input_url): |
| logger.info(f"[Task {task_id}] Adding video URL to Gemini content: {input_url}") |
| content_parts.append({ |
| "file_data": { |
| "mime_type": "video/youtube", |
| "file_uri": input_url |
| } |
| }) |
| |
| gemini_contents = [{"parts": content_parts}] |
| |
| generation_config = GenerationConfig(candidate_count=1) |
| request_options = {"timeout": GEMINI_REQUEST_TIMEOUT_SECONDS} |
|
|
| logger.info(f"[Task {task_id}] Sending request to Gemini API...") |
| response = await model_instance.generate_content_async( |
| gemini_contents, |
| stream=False, |
| generation_config=generation_config, |
| request_options=request_options |
| ) |
| |
| |
| |
| full_response_text = "" |
| if hasattr(response, 'text') and response.text: |
| full_response_text = response.text |
| elif hasattr(response, 'parts'): |
| for part in response.parts: |
| if hasattr(part, 'text'): |
| full_response_text += part.text |
| else: |
| |
| |
| |
| logger.warning(f"[Task {task_id}] Gemini response structure not as expected or empty. Response: {response}") |
|
|
|
|
| if not full_response_text and response.prompt_feedback and response.prompt_feedback.block_reason: |
| block_reason_name = response.prompt_feedback.block_reason.name if hasattr(response.prompt_feedback.block_reason, 'name') else str(response.prompt_feedback.block_reason) |
| logger.warning(f"[Task {task_id}] Gemini content blocked: {block_reason_name}") |
| tasks_db[task_id]["status"] = "FAILED" |
| tasks_db[task_id]["error"] = f"Content blocked by Gemini due to: {block_reason_name}" |
| elif full_response_text: |
| logger.info(f"[Task {task_id}] Gemini processing successful. Result length: {len(full_response_text)}") |
| tasks_db[task_id]["status"] = "COMPLETED" |
| tasks_db[task_id]["result"] = full_response_text |
| else: |
| logger.warning(f"[Task {task_id}] Gemini processing completed but no text content found and no block reason.") |
| tasks_db[task_id]["status"] = "FAILED" |
| tasks_db[task_id]["error"] = "Gemini returned no content and no specific block reason." |
|
|
| except Exception as e: |
| logger.error(f"[Task {task_id}] Error during Gemini background processing: {e}", exc_info=True) |
| tasks_db[task_id]["status"] = "FAILED" |
| tasks_db[task_id]["error"] = str(e) |
| finally: |
| tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc) |
|
|
| |
|
|
| @app.post("/chat", response_class=StreamingResponse) |
| async def direct_chat(payload: ChatPayload, request: Request): |
| logger.info(f"Direct chat request received. Temperature: {payload.temperature}, Message: '{payload.message[:50]}...'") |
| user_ip = get_user_ip(request) |
| |
| if rate_limiter.is_rate_limited(user_ip): |
| current_count = rate_limiter.get_current_count(user_ip) |
| raise HTTPException( |
| status_code=429, |
| detail={ |
| "error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", |
| "url": "https://t.me/chrunoss" |
| } |
| ) |
| custom_api_key_secret = os.getenv("CUSTOM_API_SECRET_KEY") |
| custom_api_base_url = os.getenv("CUSTOM_API_BASE_URL", CUSTOM_API_BASE_URL_DEFAULT) |
| custom_api_model = os.getenv("CUSTOM_API_MODEL", CUSTOM_API_MODEL_DEFAULT) |
| |
| if not custom_api_key_secret: |
| logger.error("Custom API key ('CUSTOM_API_SECRET_KEY') is not configured for /chat.") |
| raise HTTPException(status_code=500, detail="Custom API key not configured.") |
| |
| async def custom_api_streamer(): |
| client = None |
| try: |
| logger.info("Sending request to Custom API for /chat.") |
| |
| |
| from openai import AsyncOpenAI |
| client = AsyncOpenAI( |
| api_key=custom_api_key_secret, |
| base_url=custom_api_base_url, |
| timeout=60.0 |
| ) |
| |
| stream = await client.chat.completions.create( |
| model=custom_api_model, |
| temperature=payload.temperature, |
| messages=[{"role": "user", "content": payload.message}], |
| stream=True |
| ) |
| |
| async for chunk in stream: |
| try: |
| |
| if hasattr(chunk.choices[0].delta, "reasoning_content") and chunk.choices[0].delta.reasoning_content: |
| yield chunk.choices[0].delta.reasoning_content |
| elif chunk.choices[0].delta.content is not None: |
| yield chunk.choices[0].delta.content |
| |
| except (IndexError, AttributeError) as e: |
| |
| continue |
| except Exception as e: |
| logger.warning(f"Skipping chunk due to error: {e}") |
| continue |
| |
| except Exception as e: |
| logger.error(f"Error during Custom API call for /chat: {e}", exc_info=True) |
| |
| |
| if "peer closed connection" in str(e) or "incomplete chunked read" in str(e): |
| yield "Connection interrupted. Please try again." |
| else: |
| yield f"Error processing with Custom API: {str(e)}" |
| |
| finally: |
| if client: |
| try: |
| await client.close() |
| except Exception as cleanup_error: |
| logger.warning(f"Error closing OpenAI client: {cleanup_error}") |
| |
| return StreamingResponse( |
| custom_api_streamer(), |
| media_type="text/plain", |
| headers={ |
| "Cache-Control": "no-cache", |
| "Connection": "keep-alive", |
| } |
| ) |
|
|
| @app.post("/gemini/submit_task", response_model=TaskSubmissionResponse) |
| async def submit_gemini_task(request: GeminiTaskRequest, background_tasks: BackgroundTasks): |
| task_id = str(uuid.uuid4()) |
| logger.info(f"Received Gemini task submission. Assigning Task ID: {task_id}. Message: '{request.message[:50]}...'") |
|
|
| gemini_api_key_from_request = request.api_key |
| gemini_api_key_secret = os.getenv("GEMINI_API_KEY") |
| key_to_use = gemini_api_key_from_request |
|
|
| if not key_to_use: |
| logger.error(f"[Task {task_id}] Gemini API Key missing for task submission.") |
| raise HTTPException(status_code=400, detail="Gemini API Key required.") |
|
|
| requested_model = request.gemini_model or DEFAULT_GEMINI_MODEL |
| |
| current_time = datetime.now(timezone.utc) |
| tasks_db[task_id] = { |
| "status": "PENDING", |
| "result": None, |
| "error": None, |
| "submitted_at": current_time, |
| "last_updated_at": current_time, |
| "request_params": request.model_dump() |
| } |
|
|
| background_tasks.add_task( |
| process_gemini_request_background, |
| task_id, |
| request.message, |
| request.url, |
| requested_model, |
| key_to_use |
| ) |
| |
| logger.info(f"[Task {task_id}] Task submitted to background processing.") |
| return TaskSubmissionResponse( |
| task_id=task_id, |
| status="PENDING", |
| task_detail_url=f"/gemini/task/{task_id}" |
| ) |
|
|
|
|
|
|
| @app.get("/gemini/task/{task_id}", response_model=TaskStatusResponse) |
| async def get_gemini_task_status(task_id: str = Path(..., description="The ID of the task to retrieve")): |
| logger.info(f"Status query for Task ID: {task_id}") |
| task = tasks_db.get(task_id) |
| if not task: |
| logger.warning(f"Task ID not found: {task_id}") |
| raise HTTPException(status_code=404, detail="Task ID not found.") |
| |
| logger.info(f"[Task {task_id}] Current status: {task['status']}") |
| return TaskStatusResponse( |
| task_id=task_id, |
| status=task["status"], |
| submitted_at=task["submitted_at"], |
| last_updated_at=task["last_updated_at"], |
| result=task.get("result"), |
| error=task.get("error"), |
| |
| ) |
|
|
| @app.get("/") |
| async def read_root(): |
| logger.info("Root endpoint '/' accessed (health check).") |
| return {"message": "API for Direct Chat and Async Gemini Tasks is running."} |