der-opt-worker / app.py
tianruci's picture
Update app.py
d4f0d02 verified
raw
history blame
3.09 kB
import os
import time
import requests
from fastapi import FastAPI, Response
from fastapi.responses import StreamingResponse
from datetime import datetime, timedelta
from starlette.routing import Mount # 用于挂载
from mcp.server.fastmcp import FastMCP # 导入MCP SDK
app = FastAPI()
# 缓存最新获取的热门项目
cached_trending = []
last_updated = None
async def fetch_github_trending():
"""获取 GitHub 热门项目"""
global cached_trending, last_updated
url = "https://api.github.com/search/repositories"
params = {
"q": "stars:>1000",
"sort": "stars",
"order": "desc",
"per_page": 10
}
headers = {}
github_token = os.getenv("GITHUB_TOKEN")
if github_token:
headers["Authorization"] = f"token {github_token}"
try:
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
items = response.json().get("items", [])
# 简化项目信息
trending_projects = []
for item in items:
trending_projects.append({
"name": item["full_name"],
"url": item["html_url"],
"stars": item["stargazers_count"],
"description": item["description"],
"language": item["language"]
})
cached_trending = trending_projects
last_updated = datetime.now()
return trending_projects
except Exception as e:
print(f"Error fetching GitHub trending: {e}")
return cached_trending if cached_trending else []
# 定义MCP服务器和工具
mcp_server = FastMCP(name="GithubTrending", stateless_http=True)
@mcp_server.tool()
def get_trending_repos(num: int = 10) -> dict:
"""Tool to get top GitHub trending repositories with stars >1000"""
if not last_updated or (datetime.now() - last_updated) > timedelta(minutes=5):
fetch_github_trending()
return {"trending": cached_trending[:num]}
# 挂载MCP到FastAPI(使用/mcp路径)
app.mount("/mcp", app=mcp_server.streamable_http_app())
async def trending_generator():
"""SSE 事件生成器"""
while True:
if not last_updated or (datetime.now() - last_updated) > timedelta(minutes=5):
await fetch_github_trending()
yield f"data: {cached_trending}\n\n"
time.sleep(30)
@app.get("/trending")
async def get_trending():
"""原有端点"""
if not cached_trending or (datetime.now() - last_updated) > timedelta(minutes=5):
await fetch_github_trending()
return {"trending": cached_trending, "last_updated": last_updated.isoformat() if last_updated else None}
@app.get("/trending-sse")
async def get_trending_sse():
"""原有SSE端点"""
return StreamingResponse(
trending_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
@app.on_event("startup")
async def startup_event():
"""启动时初始化数据"""
await fetch_github_trending()