Spaces:
Running
Running
| import os | |
| import time | |
| import requests | |
| from fastapi import FastAPI, Response | |
| from fastapi.responses import StreamingResponse | |
| from datetime import datetime, timedelta | |
| from starlette.routing import Mount # 用于挂载 | |
| from mcp.server.fastmcp import FastMCP # 导入MCP SDK | |
| app = FastAPI() | |
| # 缓存最新获取的热门项目 | |
| cached_trending = [] | |
| last_updated = None | |
| async def fetch_github_trending(): | |
| """获取 GitHub 热门项目""" | |
| global cached_trending, last_updated | |
| url = "https://api.github.com/search/repositories" | |
| params = { | |
| "q": "stars:>1000", | |
| "sort": "stars", | |
| "order": "desc", | |
| "per_page": 10 | |
| } | |
| headers = {} | |
| github_token = os.getenv("GITHUB_TOKEN") | |
| if github_token: | |
| headers["Authorization"] = f"token {github_token}" | |
| try: | |
| response = requests.get(url, params=params, headers=headers) | |
| response.raise_for_status() | |
| items = response.json().get("items", []) | |
| # 简化项目信息 | |
| trending_projects = [] | |
| for item in items: | |
| trending_projects.append({ | |
| "name": item["full_name"], | |
| "url": item["html_url"], | |
| "stars": item["stargazers_count"], | |
| "description": item["description"], | |
| "language": item["language"] | |
| }) | |
| cached_trending = trending_projects | |
| last_updated = datetime.now() | |
| return trending_projects | |
| except Exception as e: | |
| print(f"Error fetching GitHub trending: {e}") | |
| return cached_trending if cached_trending else [] | |
| # 定义MCP服务器和工具 | |
| mcp_server = FastMCP(name="GithubTrending", stateless_http=True) | |
| def get_trending_repos(num: int = 10) -> dict: | |
| """Tool to get top GitHub trending repositories with stars >1000""" | |
| if not last_updated or (datetime.now() - last_updated) > timedelta(minutes=5): | |
| fetch_github_trending() | |
| return {"trending": cached_trending[:num]} | |
| # 挂载MCP到FastAPI(使用/mcp路径) | |
| app.mount("/mcp", app=mcp_server.streamable_http_app()) | |
| async def trending_generator(): | |
| """SSE 事件生成器""" | |
| while True: | |
| if not last_updated or (datetime.now() - last_updated) > timedelta(minutes=5): | |
| await fetch_github_trending() | |
| yield f"data: {cached_trending}\n\n" | |
| time.sleep(30) | |
| async def get_trending(): | |
| """原有端点""" | |
| if not cached_trending or (datetime.now() - last_updated) > timedelta(minutes=5): | |
| await fetch_github_trending() | |
| return {"trending": cached_trending, "last_updated": last_updated.isoformat() if last_updated else None} | |
| async def get_trending_sse(): | |
| """原有SSE端点""" | |
| return StreamingResponse( | |
| trending_generator(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} | |
| ) | |
| async def startup_event(): | |
| """启动时初始化数据""" | |
| await fetch_github_trending() |