| from __future__ import annotations
|
| from typing import AsyncIterable, Dict, Deque
|
| from collections import deque
|
| import requests
|
| from fastapi_poe import (
|
| PoeBot,
|
| QueryRequest,
|
| PartialResponse,
|
| SettingsRequest,
|
| SettingsResponse,
|
| make_app,
|
| )
|
| from fastapi import Header
|
| from modal import Image, App, asgi_app, Volume, Mount
|
| import os
|
| import json
|
| import re
|
| import tempfile
|
| import shutil
|
| import logging
|
|
|
|
|
| NGROK_URL = "https://fca7-2601-2c1-280-1320-b881-aac7-9186-9365.ngrok-free.app"
|
| LM_STUDIO_CHAT_URL = f"{NGROK_URL}/v1/chat/completions"
|
|
|
|
|
| MODEL_NAME = "bartowski/Qwen2.5-Coder-32B-Instruct-GGUF/Qwen2.5-Coder-32B-Instruct-IQ2_M.gguf"
|
|
|
|
|
| NEW_BOT_ACCESS_KEY = "YOUR_ACCESS_KEY_HERE"
|
|
|
|
|
| VOLUME_PATH = "/data/user_histories.json"
|
|
|
|
|
| logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] %(message)s')
|
|
|
|
|
| user_histories: Dict[str, Deque[dict]] = {}
|
|
|
|
|
| MAX_HISTORY_MESSAGES = 50
|
|
|
|
|
| if os.path.exists(VOLUME_PATH):
|
| try:
|
| with open(VOLUME_PATH, "r") as f:
|
| user_histories = {
|
| user_id: deque(history, maxlen=MAX_HISTORY_MESSAGES)
|
| for user_id, history in json.load(f).items()
|
| }
|
| logging.info("Loaded existing conversation histories.")
|
| except Exception as e:
|
| logging.error(f"Failed to load user histories: {e}")
|
| user_histories = {}
|
| else:
|
| logging.info("No existing conversation history found. Initializing a new history store.")
|
|
|
| class AnotherSecureLMStudioBot(PoeBot):
|
| async def get_response(
|
| self, request: QueryRequest, authorization: str = Header(...)
|
| ) -> AsyncIterable[PartialResponse]:
|
| """
|
| Handle user queries dynamically while validating the Poe access key.
|
| """
|
|
|
| if authorization != NEW_BOT_ACCESS_KEY:
|
| logging.warning("Unauthorized access key used.")
|
|
|
|
|
|
|
| user_id = self.get_user_id(request)
|
| if not user_id:
|
| yield PartialResponse(text="Error: User identifier not provided.")
|
| return
|
|
|
|
|
| if user_id not in user_histories:
|
| user_histories[user_id] = deque(maxlen=MAX_HISTORY_MESSAGES)
|
| logging.info(f"Initializing new conversation history for user {user_id}.")
|
| conversation_history = user_histories[user_id]
|
|
|
|
|
| user_message = request.query[-1].content
|
|
|
|
|
| user_message = re.sub(r"[<>]", "", user_message)
|
|
|
|
|
| conversation_history.append({"role": "user", "content": user_message})
|
|
|
|
|
| logging.info(f"Conversation history for user {user_id}: {list(conversation_history)}")
|
|
|
| try:
|
|
|
| response_text = self.get_chat_completion_with_context(conversation_history)
|
|
|
|
|
| conversation_history.append({"role": "assistant", "content": response_text})
|
|
|
|
|
| logging.info(f"Generated response for user {user_id}: {response_text}")
|
|
|
|
|
| self.save_conversation_history()
|
| except Exception as e:
|
|
|
| logging.error(f"An error occurred while processing the request for user {user_id}: {e}")
|
| response_text = f"An error occurred: {e}"
|
|
|
|
|
| yield PartialResponse(text=response_text.strip())
|
|
|
| def get_user_id(self, request: QueryRequest) -> str:
|
| """
|
| Extract or generate a unique user identifier.
|
| """
|
|
|
| if hasattr(request, 'user_id') and request.user_id:
|
| return request.user_id
|
|
|
|
|
| return "default_user_id"
|
|
|
| def get_chat_completion_with_context(self, conversation_history: Deque[dict]) -> str:
|
| """
|
| Send a chat completion request to LM Studio's /v1/chat/completions endpoint,
|
| including the full conversation history.
|
| """
|
|
|
| payload = {
|
| "model": MODEL_NAME,
|
| "messages": list(conversation_history),
|
| "temperature": 0.7,
|
| "max_tokens": 1024,
|
| "stream": False
|
| }
|
|
|
| logging.info(f"Sending request to LM Studio with payload:\n{json.dumps(payload, indent=2)}")
|
| response = requests.post(LM_STUDIO_CHAT_URL, json=payload, timeout=120)
|
| response.raise_for_status()
|
| response_data = response.json()
|
|
|
|
|
| if "choices" in response_data and len(response_data["choices"]) > 0:
|
| generated_text = response_data["choices"][0].get("message", {}).get("content", "")
|
| else:
|
| generated_text = ""
|
|
|
|
|
| if not generated_text:
|
| generated_text = "I'm sorry, I couldn't generate a response. Could you please try again?"
|
|
|
| return generated_text
|
|
|
| def save_conversation_history(self):
|
| """
|
| Save the current conversation history to the volume in an atomic way.
|
| """
|
| try:
|
| with tempfile.NamedTemporaryFile('w', delete=False) as tmp_file:
|
| json.dump(
|
| {user_id: list(history) for user_id, history in user_histories.items()},
|
| tmp_file,
|
| indent=4
|
| )
|
| temp_file_path = tmp_file.name
|
| shutil.move(temp_file_path, VOLUME_PATH)
|
| logging.info("Successfully saved user conversation histories.")
|
| except Exception as e:
|
| logging.error(f"Failed to save user histories: {e}")
|
|
|
| async def get_settings(self, setting: SettingsRequest) -> SettingsResponse:
|
| """
|
| Configure the bot's capabilities for Poe, such as enabling attachments.
|
| """
|
| return SettingsResponse(
|
| allow_attachments=True,
|
| allow_images=True,
|
| allow_audio=True,
|
| allow_video=True,
|
| allow_links=True,
|
| )
|
|
|
|
|
| REQUIREMENTS = ["fastapi-poe==0.0.24", "requests==2.31.0"]
|
| image = Image.debian_slim().pip_install(REQUIREMENTS)
|
|
|
|
|
| app = App(
|
| "another-secure-lmstudio-poe-bot",
|
| mounts=[Mount.from_local_dir("/data", remote_path="/data", recursive=True)]
|
| )
|
|
|
| @app.function(image=image)
|
| @asgi_app()
|
| def fastapi_app():
|
| bot = AnotherSecureLMStudioBot()
|
| return make_app(bot, allow_without_key=True)
|
|
|