openrouter_free_api / src /model_tester.py
qa1145's picture
Upload 9 files
748f7ac verified
import asyncio
import aiohttp
from typing import List, Dict, Any, Optional, Set
from datetime import datetime
import random
from .openrouter_client import OpenRouterClient
from . import config
from .utils import clean_model_name
class ModelTester:
def __init__(self):
self.client = OpenRouterClient()
self.max_concurrency = config.get_max_concurrency()
self.test_prompt = config.get_test_prompt()
self._all_models: List[str] = []
self._free_models: List[str] = []
self._available_models: List[str] = []
self._available_free_models: List[str] = []
self._scan_in_progress = False
self._last_scan_time: Optional[datetime] = None
self.scan_result: Dict[str, Any] = {
"available_models": [],
"available_free_models": [],
"total_available": 0,
"free_available": 0,
"timestamp": None
}
def refresh_model_list(self):
"""Get latest model list from API"""
models = self.client.get_models()
all_ids = []
free_ids = []
for model in models:
model_id = model.get("id", "")
if model_id:
all_ids.append(model_id)
if ":free" in model_id:
free_ids.append(model_id)
self._all_models = all_ids
self._free_models = free_ids
return len(self._all_models), len(self._free_models)
async def test_single_model_async(
self,
session: aiohttp.ClientSession,
model_id: str,
api_key: str
) -> tuple[str, bool]:
url = "https://openrouter.ai/api/v1/chat/completions"
payload = {
"model": model_id,
"messages": [{"role": "user", "content": self.test_prompt}],
"max_tokens": 10
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
timeout = aiohttp.ClientTimeout(total=config.get_request_timeout())
async with session.post(url, json=payload, headers=headers, timeout=timeout) as response:
is_success = response.status == 200
is_free = ":free" in model_id
return model_id, is_success
except Exception:
return model_id, False
async def scan_all_models_async(self):
"""Async scan all models concurrently"""
if self._scan_in_progress:
return {"error": "Scan already in progress"}
self._scan_in_progress = True
print(f"[{datetime.now()}] Starting model scan...")
all_count, free_count = self.refresh_model_list()
print(f"Total models: {all_count}, Free models: {free_count}")
api_keys = config.get_api_keys()
api_key = random.choice(api_keys)
available: Set[str] = set()
available_free: Set[str] = set()
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(self.max_concurrency)
async def test_with_semaphore(model_id: str):
async with semaphore:
return await self.test_single_model_async(session, model_id, api_key)
tasks = [test_with_semaphore(m) for m in self._all_models]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, tuple):
model_id, success = result
cleaned = clean_model_name(model_id)
if success:
available.add(cleaned)
if ":free" in model_id:
available_free.add(cleaned)
self._available_models = sorted(list(available))
self._available_free_models = sorted(list(available_free))
self._last_scan_time = datetime.now()
self._scan_in_progress = False
self.scan_result = {
"available_models": self._available_models,
"available_free_models": self._available_free_models,
"total_available": len(self._available_models),
"free_available": len(self._available_free_models),
"timestamp": self._last_scan_time.isoformat() if self._last_scan_time else None
}
print(f"Scan complete: {len(self._available_free_models)} free, {len(self._available_models)} total available")
return self.scan_result
def scan_all_models(self):
"""Sync wrapper for scan"""
return asyncio.run(self.scan_all_models_async())
def get_available_models(self, free_only: bool = False) -> List[str]:
"""Get available models list"""
if free_only:
return self._available_free_models
return self._available_models
def get_all_free_models(self) -> List[str]:
"""Get all free models from API list (not tested)"""
return self._free_models
async def try_model_direct_stream(
self,
session: aiohttp.ClientSession,
model_id: str,
api_key: str,
messages: List[Dict[str, str]]
):
"""发送流式请求到OpenRouter,返回流式迭代器"""
url = "https://openrouter.ai/api/v1/chat/completions"
payload = {
"model": model_id,
"messages": messages,
"stream": True
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=120)) as response:
async for line in response.content:
yield line
async def try_model_direct(
self,
session: aiohttp.ClientSession,
model_id: str,
api_key: str,
prompt: str = None
) -> Optional[Dict[str, Any]]:
url = "https://openrouter.ai/api/v1/chat/completions"
payload = {
"model": model_id,
"messages": [{"role": "user", "content": prompt or self.test_prompt}]
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
timeout = aiohttp.ClientTimeout(total=config.get_request_timeout())
async with session.post(url, json=payload, headers=headers, timeout=timeout) as response:
body = await response.text()
if response.status == 200:
data = await response.json()
return {
"success": True,
"model": model_id,
"response": data,
"method": "direct"
}
else:
print(f"[try_model_direct] ERROR {model_id}: HTTP {response.status}, body: {body[:200]}")
return {
"success": False,
"model": model_id,
"error": f"HTTP {response.status}: {body[:100]}",
"method": "direct"
}
except asyncio.TimeoutError:
return {"success": False, "model": model_id, "error": "timeout", "method": "direct"}
except Exception as e:
return {"success": False, "model": model_id, "error": str(e), "method": "direct"}
async def try_best_available_model(
self,
session: aiohttp.ClientSession,
keyword: str,
api_key: str,
prompt: str = None
) -> Optional[Dict[str, Any]]:
# 第一步:从API获取最新的free模型列表
print(f"[try_best] Keyword: {keyword}, Refreshing model list...")
try:
self.refresh_model_list()
except Exception as e:
print(f"[try_best] refresh_model_list failed: {e}")
# 使用所有free模型,而不是已测试的
available_free = self.get_all_free_models()
print(f"[try_best] Found {len(available_free)} free models")
# 第二步:用关键词匹配模型(避免匹配到不完整的ID)
candidates = []
if keyword and available_free:
# 只匹配模型名部分,不匹配作者前缀
matched = []
for m in available_free:
model_name = m.replace(":free", "").split("/")[-1]
if keyword.lower() in model_name.lower():
matched.append(m)
print(f"[try_best] Keyword '{keyword}' matched: {matched[:5]}")
if matched:
candidates.extend([(m, "matched") for m in matched[:10]])
# 如果关键词没匹配或没提供关键词,随机取free模型
if not candidates and available_free:
candidates = [(m, "random") for m in available_free[:15]]
print(f"[try_best] Using random models: {candidates[:3]}")
# 如果列表为空,从API直接获取并测试
if not candidates:
print("[try_best] No candidates, fetching from API directly...")
try:
all_models = self.client.get_models()
all_free = [m.get("id", "") for m in all_models if ":free" in m.get("id", "")]
print(f"[try_best] API returned {len(all_free)} free models")
if all_free:
candidates = [(m, "api") for m in all_free[:20]]
except Exception as e:
print(f"[try_best] API fetch failed: {e}")
# 第三步:并发发送请求测试这些模型
if not candidates:
return {
"success": False,
"model": None,
"error": "No candidates available",
"method": "list_empty"
}
print(f"[try_best] Testing {len(candidates)} candidates...")
semaphore = asyncio.Semaphore(5)
async def try_one(model_id, match_type):
async with semaphore:
# 模型ID已经是完整格式(包含:free),不需要再添加
print(f"[try_best] Testing model: {model_id}")
result = await self.try_model_direct(session, model_id, api_key, prompt)
if result:
print(f"[try_best] Result for {model_id}: success={result.get('success')}, error={result.get('error', 'none')}")
else:
print(f"[try_best] Result for {model_id}: None")
return result
tasks = [try_one(m, t) for m, t in candidates]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, dict) and result.get("success"):
result["method"] = f"list_{candidates[i][1]}"
print(f"[try_best] SUCCESS with {candidates[i][0]}")
return result
print(f"[try_best] All candidates failed")
return {
"success": False,
"model": candidates[0][0] if candidates else None,
"error": "No available model found",
"method": "list_fallback"
}
def find_model_in_list(self, keyword: str) -> Optional[str]:
"""Find full model ID from keyword"""
available_free = self.get_all_free_models()
# 先精确匹配
for model in available_free:
model_name = model.replace(":free", "").split("/")[-1]
if model_name.lower() == keyword.lower():
return model
# 然后模糊匹配
for model in available_free:
if keyword.lower() in model.lower():
return model
return None
async def chat_completion(self, prompt: str, model_hint: Optional[str] = None) -> Dict[str, Any]:
api_keys = config.get_api_keys()
api_key = random.choice(api_keys)
async with aiohttp.ClientSession() as session:
tasks = []
# 方案1:用户指定模型,需要先找到完整的模型ID
if model_hint:
# 尝试在模型列表中找到匹配的完整模型ID
full_model_id = self.find_model_in_list(model_hint)
if full_model_id:
# 找到完整ID,直接使用
tasks.append(asyncio.create_task(
self.try_model_direct(session, full_model_id, api_key, prompt)
))
else:
# 没找到,尝试用原始输入(可能是完整ID)
full_model = f"{model_hint}:free" if ":free" not in model_hint else model_hint
tasks.append(asyncio.create_task(
self.try_model_direct(session, full_model, api_key, prompt)
))
tasks.append(asyncio.create_task(
self.try_best_available_model(session, model_hint or "", api_key, prompt)
))
# 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 先检查方案1
result1 = results[0]
if isinstance(result1, dict) and result1.get("success"):
return {
"success": True,
"response": result1.get("response"),
"method": result1.get("method"),
"model": result1.get("model")
}
# 方案1失败,检查方案2
result2 = results[1]
if isinstance(result2, dict) and result2.get("success"):
return {
"success": True,
"response": result2.get("response"),
"method": result2.get("method"),
"model": result2.get("model")
}
# 都失败了,返回方案2的错误(更详细)
return {
"success": False,
"error": result2.get("error", "Unknown error") if isinstance(result2, dict) else "Request failed",
"method": result2.get("method", "both_failed") if isinstance(result2, dict) else "both_failed"
}
def chat_completion_sync(self, prompt: str, model_hint: Optional[str] = None) -> Dict[str, Any]:
return asyncio.run(self.chat_completion(prompt, model_hint))
def test_single_model(self, model_id: str) -> tuple[str, bool]:
is_available = self.client.test_model(model_id, self.test_prompt)
cleaned_name = clean_model_name(model_id)
return cleaned_name, is_available
def test_all_models(self) -> Dict[str, Any]:
"""Legacy sync method - use scan_all_models instead"""
return self.scan_all_models()
async def chat_completion_stream(self, model_hint: Optional[str], messages: List[Dict[str, str]]):
"""流式聊天 - 返回生成器"""
api_keys = config.get_api_keys()
api_key = random.choice(api_keys)
# 方案1:尝试用户指定的模型
if model_hint:
full_model_id = self.find_model_in_list(model_hint)
if full_model_id:
async with aiohttp.ClientSession() as session:
async for chunk in self.try_model_direct_stream(session, full_model_id, api_key, messages):
yield chunk
return
# 方案2:从列表中找到可用模型
self.refresh_model_list()
available_free = self.get_all_free_models()
candidates = []
if model_hint and available_free:
for m in available_free:
model_name = m.replace(":free", "").split("/")[-1]
if model_hint.lower() in model_name.lower():
candidates.append(m)
if not candidates and available_free:
candidates = available_free[:10]
async with aiohttp.ClientSession() as session:
for model_id in candidates:
async for chunk in self.try_model_direct_stream(session, model_id, api_key, messages):
yield chunk
return